mirror of
https://github.com/SpudGunMan/meshing-around.git
synced 2026-03-28 17:32:36 +01:00
Compare commits
45 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0e275a49c | ||
|
|
bf39c2f088 | ||
|
|
34d36057c1 | ||
|
|
4e1d1de883 | ||
|
|
97f103dfd7 | ||
|
|
47089871b1 | ||
|
|
cc7ef129f6 | ||
|
|
0fa5d06a3a | ||
|
|
7fc44ec06e | ||
|
|
184760096e | ||
|
|
8868d10388 | ||
|
|
1ce2ecd75c | ||
|
|
69e1c21488 | ||
|
|
97a2ffce7b | ||
|
|
4c0d3a597e | ||
|
|
094f7e61a0 | ||
|
|
a54ecaa5a1 | ||
|
|
bd12392d69 | ||
|
|
882bcf3f4b | ||
|
|
c0d0ca3743 | ||
|
|
d74d848646 | ||
|
|
2afb915b56 | ||
|
|
d5e48bead1 | ||
|
|
3c80848f61 | ||
|
|
64345fe47a | ||
|
|
32f734d69b | ||
|
|
aa6de00c5b | ||
|
|
6df4ba5756 | ||
|
|
a11a2780db | ||
|
|
980414f872 | ||
|
|
f26334d625 | ||
|
|
24546b28d6 | ||
|
|
f33da848cd | ||
|
|
57ce15de4e | ||
|
|
b8886e0662 | ||
|
|
9a1e86f25e | ||
|
|
fa8021ab5a | ||
|
|
f3917f1c3d | ||
|
|
c1443048fd | ||
|
|
da430557f3 | ||
|
|
84152bda65 | ||
|
|
b6e80ae576 | ||
|
|
18ac26864c | ||
|
|
b661fbc750 | ||
|
|
3049d18663 |
10
compose.yaml
10
compose.yaml
@@ -19,6 +19,16 @@ services:
|
||||
networks:
|
||||
- meshing-around-network
|
||||
|
||||
test-bot:
|
||||
image: ghcr.io/spudgunman/meshing-around:main
|
||||
container_name: test-bot
|
||||
command: ["/bin/bash", "-c", "python3 modules/test_bot.py | tee /tmp/test_tmp.txt; if grep -E 'failures=|errors=' /tmp/test_tmp.txt; then cp /tmp/test_tmp.txt /app/test_results.txt; fi"]
|
||||
volumes:
|
||||
- .:/app:rw
|
||||
networks:
|
||||
- meshing-around-network
|
||||
stdin_open: true
|
||||
|
||||
debug-console:
|
||||
image: ghcr.io/spudgunman/meshing-around:main
|
||||
container_name: debug-console
|
||||
|
||||
@@ -57,9 +57,9 @@ spaceWeather = True
|
||||
|
||||
# enable or disable the RSS module, and truncate the story
|
||||
rssEnable = True
|
||||
rssFeedURL = http://www.hackaday.com/rss.xml,http://rss.slashdot.org/Slashdot/slashdotMain
|
||||
rssFeedURL = http://www.hackaday.com/rss.xml,http://rss.slashdot.org/Slashdot/slashdotMain,http://www.reddit.com/r/meshtastic/.rss
|
||||
# RSS feed names must match the order of the URLs above, default is used if no match
|
||||
rssFeedNames = default,slashdot
|
||||
rssFeedNames = default,slashdot,mesh
|
||||
rssMaxItems = 3
|
||||
rssTruncate = 100
|
||||
|
||||
|
||||
78
mesh_bot.py
78
mesh_bot.py
@@ -103,7 +103,7 @@ def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_n
|
||||
"whereami": lambda: handle_whereami(message_from_id, deviceID, channel_number),
|
||||
"whoami": lambda: handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus),
|
||||
"whois": lambda: handle_whois(message, deviceID, channel_number, message_from_id),
|
||||
"wiki:": lambda: handle_wiki(message, isDM),
|
||||
"wiki": lambda: handle_wiki(message, isDM),
|
||||
"wx": lambda: handle_wxc(message_from_id, deviceID, 'wx'),
|
||||
"wxa": lambda: handle_wxalert(message_from_id, deviceID, message),
|
||||
"wxalert": lambda: handle_wxalert(message_from_id, deviceID, message),
|
||||
@@ -491,19 +491,16 @@ def handle_howtall(message, message_from_id, deviceID, isDM):
|
||||
|
||||
def handle_wiki(message, isDM):
|
||||
# location = get_node_location(message_from_id, deviceID)
|
||||
msg = "Wikipedia search function. \nUsage example:📲wiki: travelling gnome"
|
||||
try:
|
||||
if "wiki:?" in message.lower() or "wiki: ?" in message.lower() or "wiki?" in message.lower() or "wiki ?" in message.lower():
|
||||
return msg
|
||||
if "wiki" in message.lower():
|
||||
search = message.split(":")[1]
|
||||
search = search.strip()
|
||||
if search:
|
||||
return get_wikipedia_summary(search)
|
||||
return "Please add a search term example:📲wiki: travelling gnome"
|
||||
except Exception as e:
|
||||
logger.error(f"System: Wiki Exception {e}")
|
||||
msg = "Error processing your request"
|
||||
msg = "Wikipedia search function. \nUsage example:📲wiki travelling gnome"
|
||||
if "?" in message.lower():
|
||||
return msg
|
||||
if "wiki" in message.lower():
|
||||
parts = message.split(" ", 1)
|
||||
if len(parts) < 2 or not parts[1].strip():
|
||||
return "Please add a search term example:📲wiki travelling gnome"
|
||||
search = parts[1].strip()
|
||||
if search:
|
||||
return get_wikipedia_summary(search)
|
||||
|
||||
return msg
|
||||
|
||||
@@ -629,7 +626,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel
|
||||
return response
|
||||
|
||||
def handleDopeWars(message, nodeID, rxNode):
|
||||
from modules.settings import dwPlayerTracker
|
||||
global dwPlayerTracker
|
||||
global dwHighScore
|
||||
|
||||
# Find player in tracker
|
||||
@@ -682,7 +679,7 @@ def handle_gTnW(chess = False):
|
||||
return response[selected_index]
|
||||
|
||||
def handleLemonade(message, nodeID, deviceID):
|
||||
from modules.settings import lemonadeTracker
|
||||
global lemonadeTracker
|
||||
global lemonadeCups, lemonadeLemons, lemonadeSugar, lemonadeWeeks, lemonadeScore, lemon_starting_cash, lemon_total_weeks
|
||||
msg = ""
|
||||
|
||||
@@ -730,9 +727,8 @@ def handleLemonade(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleBlackJack(message, nodeID, deviceID):
|
||||
from modules.settings import jackTracker
|
||||
global jackTracker
|
||||
msg = ""
|
||||
|
||||
# Find player in tracker
|
||||
player = next((p for p in jackTracker if p['nodeID'] == nodeID), None)
|
||||
|
||||
@@ -786,7 +782,7 @@ def handleBlackJack(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleVideoPoker(message, nodeID, deviceID):
|
||||
from modules.settings import vpTracker
|
||||
global vpTracker
|
||||
msg = ""
|
||||
|
||||
# Find player in tracker
|
||||
@@ -801,7 +797,17 @@ def handleVideoPoker(message, nodeID, deviceID):
|
||||
|
||||
# Create new player if not found
|
||||
if not player and nodeID != 0:
|
||||
vpTracker.append({'nodeID': nodeID, 'cmd': 'new', 'last_played': time.time()})
|
||||
vpTracker.append({
|
||||
'nodeID': nodeID,
|
||||
'cmd': 'new',
|
||||
'last_played': time.time(),
|
||||
'time': time.time(),
|
||||
'cash': vpStartingCash,
|
||||
'player': None,
|
||||
'deck': None,
|
||||
'highScore': 0,
|
||||
'drawCount': 0
|
||||
})
|
||||
msg += "Welcome to 🎰Video Poker!🎰\n"
|
||||
# Show high score if available
|
||||
highScore = loadHSVp()
|
||||
@@ -821,7 +827,7 @@ def handleVideoPoker(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleMmind(message, nodeID, deviceID):
|
||||
from modules.settings import mindTracker
|
||||
global mindTracker
|
||||
msg = ''
|
||||
|
||||
if "end" in message.lower() or message.lower().startswith("e"):
|
||||
@@ -865,7 +871,7 @@ def handleMmind(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleGolf(message, nodeID, deviceID):
|
||||
from modules.settings import golfTracker
|
||||
global golfTracker
|
||||
msg = ''
|
||||
|
||||
# get player's last command from tracker if not new player
|
||||
@@ -913,7 +919,7 @@ def handleGolf(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleHangman(message, nodeID, deviceID):
|
||||
from modules.settings import hangmanTracker
|
||||
global hangmanTracker
|
||||
index = 0
|
||||
msg = ''
|
||||
for i in range(len(hangmanTracker)):
|
||||
@@ -939,7 +945,7 @@ def handleHangman(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleHamtest(message, nodeID, deviceID):
|
||||
from modules.settings import hamtestTracker
|
||||
global hamtestTracker
|
||||
index = 0
|
||||
msg = ''
|
||||
response = message.split(' ')
|
||||
@@ -972,7 +978,7 @@ def handleHamtest(message, nodeID, deviceID):
|
||||
return msg
|
||||
|
||||
def handleTicTacToe(message, nodeID, deviceID):
|
||||
from modules.settings import tictactoeTracker
|
||||
global tictactoeTracker
|
||||
index = 0
|
||||
msg = ''
|
||||
|
||||
@@ -1964,17 +1970,17 @@ async def start_rx():
|
||||
|
||||
# Initialize game trackers
|
||||
gameTrackers = [
|
||||
(dwPlayerTracker, "DopeWars", handleDopeWars) if 'dwPlayerTracker' in globals() else None,
|
||||
(lemonadeTracker, "LemonadeStand", handleLemonade) if 'lemonadeTracker' in globals() else None,
|
||||
(vpTracker, "VideoPoker", handleVideoPoker) if 'vpTracker' in globals() else None,
|
||||
(jackTracker, "BlackJack", handleBlackJack) if 'jackTracker' in globals() else None,
|
||||
(mindTracker, "MasterMind", handleMmind) if 'mindTracker' in globals() else None,
|
||||
(golfTracker, "GolfSim", handleGolf) if 'golfTracker' in globals() else None,
|
||||
(hangmanTracker, "Hangman", handleHangman) if 'hangmanTracker' in globals() else None,
|
||||
(hamtestTracker, "HamTest", handleHamtest) if 'hamtestTracker' in globals() else None,
|
||||
(tictactoeTracker, "TicTacToe", handleTicTacToe) if 'tictactoeTracker' in globals() else None,
|
||||
(surveyTracker, "Survey", surveyHandler) if 'surveyTracker' in globals() else None,
|
||||
#quiz does not use a tracker (quizGamePlayer) always active
|
||||
(dwPlayerTracker, "DopeWars", handleDopeWars),
|
||||
(lemonadeTracker, "LemonadeStand", handleLemonade),
|
||||
(vpTracker, "VideoPoker", handleVideoPoker),
|
||||
(jackTracker, "BlackJack", handleBlackJack),
|
||||
(mindTracker, "MasterMind", handleMmind),
|
||||
(golfTracker, "GolfSim", handleGolf),
|
||||
(hangmanTracker, "Hangman", handleHangman),
|
||||
(hamtestTracker, "HamTest", handleHamtest),
|
||||
(tictactoeTracker, "TicTacToe", handleTicTacToe),
|
||||
(surveyTracker, "Survey", surveyHandler),
|
||||
# quiz does not use a tracker (quizGamePlayer) always active
|
||||
]
|
||||
|
||||
# Hello World
|
||||
|
||||
@@ -262,7 +262,7 @@ Configure in `[ollama]` section of `config.ini`.
|
||||
|
||||
| Command | Description |
|
||||
|--------------|-----------------------------------------------|
|
||||
| `wiki:` | Search Wikipedia or local Kiwix server |
|
||||
| `wiki` | Search Wikipedia or local Kiwix server |
|
||||
|
||||
Configure in `[wikipedia]` section of `config.ini`.
|
||||
|
||||
|
||||
@@ -129,4 +129,79 @@ This will call the default script located at `script/runShell.sh` and return its
|
||||
|
||||
---
|
||||
|
||||
|
||||
## Overview Unit Tests
|
||||
|
||||
Your test_bot.py file contains a comprehensive suite of unit tests for the various modules the project. The tests are organized using Python’s `unittest` framework and cover both core utility modules and all major game modules.
|
||||
|
||||
---
|
||||
|
||||
## Structure
|
||||
|
||||
- **Imports & Setup:**
|
||||
The script sets up the environment, imports all necessary modules, and suppresses certain warnings for clean test output.
|
||||
|
||||
- **TestBot Class:**
|
||||
All tests are methods of the `TestBot` class, which inherits from `unittest.TestCase`.
|
||||
|
||||
---
|
||||
|
||||
## Core Module Tests
|
||||
|
||||
- **Database & Checklist:**
|
||||
- `test_load_bbsdb`, `test_bbs_list_messages`, `test_initialize_checklist_database`
|
||||
- **News & Alerts:**
|
||||
- `test_init_news_sources`, `test_get_nina_alerts`
|
||||
- **LLM & Wikipedia:**
|
||||
- `test_llmTool_get_google`, `test_send_ollama_query`, `test_get_wikipedia_summary`, `test_get_kiwix_summary`
|
||||
- **Space & Weather:**
|
||||
- `test_get_moon_phase`, `test_get_sun_times`, `test_hf_band_conditions`
|
||||
- **Radio & Location:**
|
||||
- `test_get_hamlib`, `test_get_rss_feed`, `get_openskynetwork`, `test_initalize_qrz_database`
|
||||
|
||||
---
|
||||
|
||||
## Game Module Tests
|
||||
|
||||
Each game module has a dedicated test that simulates a typical user interaction:
|
||||
|
||||
- **Tic-Tac-Toe:**
|
||||
Starts a game and makes one move.
|
||||
- **Video Poker:**
|
||||
Starts a session and places a bet.
|
||||
- **Blackjack:**
|
||||
Starts a game and places a bet.
|
||||
- **Hangman:**
|
||||
Starts a game and guesses a letter.
|
||||
- **Lemonade Stand:**
|
||||
Starts a game and buys a box of cups.
|
||||
- **GolfSim:**
|
||||
Starts a hole and takes a shot.
|
||||
- **DopeWars:**
|
||||
Starts a game, selects a city, and checks the list.
|
||||
- **MasterMind:**
|
||||
Starts a game and makes one guess.
|
||||
- **Quiz:**
|
||||
Starts a quiz, joins as a player, answers one question, and ends the quiz.
|
||||
- **Survey:**
|
||||
Starts a survey, answers one question, and ends the survey.
|
||||
- **HamTest:**
|
||||
Starts a ham radio test and answers one question.
|
||||
|
||||
---
|
||||
|
||||
## Extended API Tests
|
||||
|
||||
If the `.checkall` file is present, additional API and data-fetching tests are run for:
|
||||
- RepeaterBook, ArtSciRepeaters, NOAA tides/weather, USGS earthquakes/volcanoes, satellite passes, and more.
|
||||
|
||||
## Notes
|
||||
|
||||
- Tests are designed to be **non-destructive** and **idempotent**.
|
||||
- Some tests require specific data files (e.g., for quiz, survey, hamtest).
|
||||
- The suite is intended to be run from the main program directory.
|
||||
|
||||
|
||||
|
||||
|
||||
Happy hacking!
|
||||
@@ -21,30 +21,33 @@ bbs_dm = []
|
||||
|
||||
def load_bbsdb():
|
||||
global bbs_messages
|
||||
# load the bbs messages from the database file
|
||||
try:
|
||||
with open('data/bbsdb.pkl', 'rb') as f:
|
||||
new_bbs_messages = pickle.load(f)
|
||||
if isinstance(new_bbs_messages, list):
|
||||
for msg in new_bbs_messages:
|
||||
#example [1, 'Welcome to meshBBS', 'Welcome to the BBS, please post a message!', 0]
|
||||
msgHash = hash(tuple(msg[1:3])) # Create a hash of the message content (subject and body)
|
||||
# Check if the message already exists in bbs_messages
|
||||
msgHash = hash(tuple(msg[1:3]))
|
||||
if all(hash(tuple(existing_msg[1:3])) != msgHash for existing_msg in bbs_messages):
|
||||
# if the message is not a duplicate, add it to bbs_messages Maintain the message ID sequence
|
||||
new_id = len(bbs_messages) + 1
|
||||
bbs_messages.append([new_id, msg[1], msg[2], msg[3]])
|
||||
logger.info(f"System: Loaded BBS Message ID {new_id}, subject: {msg[1]} from bbsdb.pkl")
|
||||
return True # Loaded successfully, regardless of whether new messages were added
|
||||
return False # File existed but did not contain a valid list of messages (possibly corrupted)
|
||||
except FileNotFoundError:
|
||||
# create a new bbsdb.pkl with a welcome message
|
||||
# template ([messageID, subject, message, fromNode, now, thread, replyto])
|
||||
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0,time.strftime('%Y-%m-%d %H:%M:%S'),0,0]]
|
||||
logger.debug("System: bbsdb.pkl not found, creating new one")
|
||||
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0]]
|
||||
try:
|
||||
with open('data/bbsdb.pkl', 'wb') as f:
|
||||
pickle.dump(bbs_messages, f)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"System: Error creating bbsdb.pkl: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"System: Error loading bbsdb.pkl: {e}")
|
||||
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0]]
|
||||
return False
|
||||
|
||||
def save_bbsdb():
|
||||
global bbs_messages
|
||||
|
||||
@@ -8,18 +8,21 @@ import time
|
||||
trap_list_checklist = ("checkin", "checkout", "checklist", "purgein", "purgeout")
|
||||
|
||||
def initialize_checklist_database():
|
||||
# create the database
|
||||
conn = sqlite3.connect(checklist_db)
|
||||
c = conn.cursor()
|
||||
# Check if the checkin table exists, and create it if it doesn't
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS checkin
|
||||
(checkin_id INTEGER PRIMARY KEY, checkin_name TEXT, checkin_date TEXT, checkin_time TEXT, location TEXT, checkin_notes TEXT)''')
|
||||
# Check if the checkout table exists, and create it if it doesn't
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS checkout
|
||||
(checkout_id INTEGER PRIMARY KEY, checkout_name TEXT, checkout_date TEXT, checkout_time TEXT, location TEXT, checkout_notes TEXT)''')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
logger.debug("System: Ensured data/checklist.db exists with required tables")
|
||||
try:
|
||||
conn = sqlite3.connect(checklist_db)
|
||||
c = conn.cursor()
|
||||
# Check if the checkin table exists, and create it if it doesn't
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS checkin
|
||||
(checkin_id INTEGER PRIMARY KEY, checkin_name TEXT, checkin_date TEXT, checkin_time TEXT, location TEXT, checkin_notes TEXT)''')
|
||||
# Check if the checkout table exists, and create it if it doesn't
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS checkout
|
||||
(checkout_id INTEGER PRIMARY KEY, checkout_name TEXT, checkout_date TEXT, checkout_time TEXT, location TEXT, checkout_notes TEXT)''')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Checklist: Failed to initialize database: {e}")
|
||||
return False
|
||||
|
||||
def checkin(name, date, time, location, notes):
|
||||
location = ", ".join(map(str, location))
|
||||
|
||||
@@ -178,6 +178,9 @@ def initNewsSources():
|
||||
if file.endswith('_news.txt'):
|
||||
source = file[:-9] # remove _news.txt
|
||||
newsSourcesList.append(source)
|
||||
return True
|
||||
logger.info("FileMon: No news sources found")
|
||||
return False
|
||||
|
||||
#initialize the headlines on startup
|
||||
initNewsSources()
|
||||
|
||||
@@ -9,8 +9,10 @@
|
||||
- [Tic-Tac-Toe](#tic-tac-toe-game-module)
|
||||
- [MasterMind](#mastermind-game-module)
|
||||
- [Video Poker](#video-poker-game-module)
|
||||
- [Hangman](#hangman-game-module)
|
||||
- [Quiz](#quiz-game-module)
|
||||
- [Survey](#survey--module-game)
|
||||
- [Word of the Day Game](#word-of-the-day-game--rules--features)
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -538,4 +540,182 @@ Place your Bet, or (L)eave Table.
|
||||
"turtle",
|
||||
"lizard",
|
||||
"snake"
|
||||
]
|
||||
]
|
||||
|
||||
# Hangman Game Module
|
||||
|
||||
A classic word-guessing game for the Meshtastic mesh-bot. Try to guess the hidden word one letter at a time before you run out of chances!
|
||||
|
||||
## How to Play
|
||||
|
||||
- **Start the Game:**
|
||||
Send the command `hangman` via DM to the bot to begin a new game.
|
||||
|
||||
- **Objective:**
|
||||
Guess the secret word by suggesting letters, one at a time. Each incorrect guess brings you closer to losing!
|
||||
|
||||
- **Game Flow:**
|
||||
1. **New Game:**
|
||||
- The bot picks a random word and shows you its masked form (e.g., `_ _ _ _ _`).
|
||||
- You’ll see your total games played and games won.
|
||||
2. **Guessing:**
|
||||
- Type a single letter to guess.
|
||||
- Correct guesses reveal all instances of that letter in the word.
|
||||
- Incorrect guesses are tracked; you have 6 chances before the game ends.
|
||||
- The bot shows your progress, wrong guesses, and a hangman emoji status.
|
||||
3. **Winning & Losing:**
|
||||
- Guess all letters before reaching 6 wrong guesses to win!
|
||||
- If you lose, the bot reveals the word and starts a new game.
|
||||
|
||||
- **Commands:**
|
||||
- Enter a single letter to guess.
|
||||
- Start a new game by sending `hangman` again.
|
||||
|
||||
## Example Session
|
||||
|
||||
```
|
||||
_ _ _ _ _ _ _
|
||||
Guess a letter
|
||||
|
||||
|
||||
🥳
|
||||
Total Games: 1, Won: 1
|
||||
M E S H T A S T I C
|
||||
Guess a letter
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
- The word list is loaded from `data/hangman.json` if available, or uses a built-in default list. [\"apple\",\"banana\",\"cherry\"]
|
||||
- Game stats are tracked per player.
|
||||
- Only one game session per player at a time.
|
||||
- Play via DM for best experience.
|
||||
|
||||
## Data Files
|
||||
|
||||
- `data/hangman.json`: List of words for Hangman.
|
||||
Example:
|
||||
```
|
||||
[
|
||||
"apple",
|
||||
"banana",
|
||||
"cherry"
|
||||
]
|
||||
```
|
||||
|
||||
## Credits
|
||||
|
||||
- Written for Meshtastic mesh-bot by ZR1RF Johannes le Roux 2025
|
||||
|
||||
# Quiz Game Module
|
||||
|
||||
This module implements a multiplayer quiz game for the Meshtastic mesh-bot.
|
||||
|
||||
## How to Play
|
||||
|
||||
- **Start the Game:**
|
||||
The quizmaster starts the quiz session (usually with `/quiz start` or similar command).
|
||||
- **Join the Game:**
|
||||
Players join by sending `/quiz join` or by answering a question while a quiz is active.
|
||||
- **Answer Questions:**
|
||||
- Use `Q: <answer>` to answer the current question.
|
||||
- For multiple choice, answer with `A`, `B`, `C`, etc.
|
||||
- For free-text, type the answer after `Q: `.
|
||||
- Use `Q: ?` to request the next question.
|
||||
- **Leave the Game:**
|
||||
Players can leave at any time with `/quiz leave`.
|
||||
- **Stop the Game:**
|
||||
The quizmaster stops the quiz session (e.g., `/quiz stop`). Final scores and the top 3 players are announced.
|
||||
|
||||
## Rules & Features
|
||||
|
||||
- Only the quizmaster can start or stop the quiz.
|
||||
- Players can join or leave at any time while the quiz is active.
|
||||
- Questions are loaded from quiz_questions.json and can be multiple choice or free-text.
|
||||
- Players earn 1 point for each correct answer.
|
||||
- The first player to answer each question correctly is noted.
|
||||
- The top 3 players are displayed at the end of the quiz.
|
||||
- The quizmaster can broadcast messages to all players.
|
||||
|
||||
## Example Commands
|
||||
|
||||
- Start quiz:
|
||||
`/quiz start`
|
||||
- Join quiz:
|
||||
`/quiz join`
|
||||
- Answer a question:
|
||||
`Q: B`
|
||||
`Q: Paris`
|
||||
- Next question:
|
||||
`Q: ?`
|
||||
- Leave quiz:
|
||||
`/quiz leave`
|
||||
- Stop quiz:
|
||||
`/quiz stop`
|
||||
|
||||
## Notes
|
||||
|
||||
- Only one quiz can be active at a time.
|
||||
- Players can only answer each question once.
|
||||
- The quizmaster is defined by the `bbs_admin_list` variable.
|
||||
- Questions must be formatted correctly in the JSON file for the game to function.
|
||||
|
||||
---
|
||||
|
||||
**Written for Meshtastic mesh-bot by K7MHI Kelly Keeton 2025**
|
||||
|
||||
Certainly! Here’s documentation for the **Survey Game Module** in the same format as your other game modules:
|
||||
|
||||
---
|
||||
|
||||
# Survey Module "game"
|
||||
This module implements a survey system for the Meshtastic mesh-bot.
|
||||
|
||||
## How to Play
|
||||
|
||||
- **Start the Survey:**
|
||||
Users start a survey by specifying the survey name (e.g., `/survey start example`).
|
||||
The survey will prompt the user with the first question.
|
||||
|
||||
- **Answer Questions:**
|
||||
- For multiple choice: reply with a letter (A, B, C, ...).
|
||||
- For integer: reply with a number.
|
||||
- For text: reply with your answer as text.
|
||||
After each answer, the next question is shown automatically.
|
||||
|
||||
- **End the Survey:**
|
||||
The survey ends automatically after the last question, or the user can send `end` to finish early.
|
||||
Responses are saved to a CSV file.
|
||||
|
||||
## Rules & Features
|
||||
|
||||
- Surveys are defined in JSON files in surveys (e.g., `example_survey.json`).
|
||||
- Each survey can have multiple choice, integer, or text questions.
|
||||
- User responses are saved to a CSV file named `<survey_name>_responses.csv` in the same directory.
|
||||
- Users can only answer each question once per survey session.
|
||||
- Survey results can be summarized and reported by the bot.
|
||||
|
||||
## Example Commands
|
||||
|
||||
- Start a survey:
|
||||
`/survey start example`
|
||||
- Answer a multiple choice question:
|
||||
`A`
|
||||
- Answer an integer question:
|
||||
`42`
|
||||
- Answer a text question:
|
||||
`My favorite color is blue.`
|
||||
- End the survey early:
|
||||
`end`
|
||||
- Get survey results (admin):
|
||||
`/survey results example`
|
||||
|
||||
## Notes
|
||||
|
||||
- Only surveys listed in the surveys directory with the `_survey.json` suffix are available.
|
||||
- Each user’s responses are tracked separately.
|
||||
- Results are summarized and can be displayed by the bot.
|
||||
|
||||
---
|
||||
|
||||
**Written for Meshtastic mesh-bot by K7MHI Kelly Keeton 2025**
|
||||
@@ -7,7 +7,6 @@ import time
|
||||
import pickle
|
||||
|
||||
jack_starting_cash = 100 # Replace 100 with your desired starting cash value
|
||||
from modules.settings import jackTracker
|
||||
|
||||
SUITS = ("♥️", "♦️", "♠️", "♣️")
|
||||
RANKS = (
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
# Written for Meshtastic mesh-bot by ZR1RF Johannes le Roux 2025
|
||||
from modules.log import *
|
||||
import os
|
||||
import json
|
||||
import random
|
||||
|
||||
class Hangman:
|
||||
@@ -118,6 +121,25 @@ class Hangman:
|
||||
|
||||
def __init__(self):
|
||||
self.game = {}
|
||||
self.DEFAULT_WORDS = self.WORDS
|
||||
|
||||
|
||||
# Try to load hangman.json if it exists
|
||||
hangman_json_path = os.path.join('data', 'hangman.json')
|
||||
if os.path.exists(hangman_json_path):
|
||||
try:
|
||||
with open(hangman_json_path, 'r') as f:
|
||||
words = json.load(f)
|
||||
# Ensure it's a list of strings
|
||||
if isinstance(words, list) and all(isinstance(w, str) for w in words):
|
||||
self.WORDS = words
|
||||
else:
|
||||
self.WORDS = self.DEFAULT_WORDS
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
logger.warning("Failed to load hangman.json, using default words. example JSON: [\"apple\",\"banana\",\"cherry\"]")
|
||||
self.WORDS = self.DEFAULT_WORDS
|
||||
else:
|
||||
self.WORDS = self.DEFAULT_WORDS
|
||||
|
||||
def new_game(self, id):
|
||||
games = won = 0
|
||||
|
||||
@@ -23,7 +23,6 @@ lemonadeLemons = [{'nodeID': 0, 'cost': 4.00, 'count': 8, 'min': 2.00, 'unit': 0
|
||||
lemonadeSugar = [{'nodeID': 0, 'cost': 3.00, 'count': 15, 'min': 1.50, 'unit': 0.00}]
|
||||
lemonadeWeeks = [{'nodeID': 0, 'current': 1, 'total': lemon_total_weeks, 'sales': 99, 'potential': 0, 'unit': 0.00, 'price': 0.00, 'total_sales': 0}]
|
||||
lemonadeScore = [{'nodeID': 0, 'value': 0.00, 'total': 0.00}]
|
||||
from modules.settings import lemonadeTracker
|
||||
|
||||
def get_sales_amount(potential, unit, price):
|
||||
"""Gets the sales amount.
|
||||
|
||||
@@ -5,7 +5,7 @@ import random
|
||||
import time
|
||||
import pickle
|
||||
from modules.log import *
|
||||
from modules.settings import mindTracker
|
||||
|
||||
def chooseDifficultyMMind(message):
|
||||
usrInput = message.lower()
|
||||
msg = ''
|
||||
|
||||
@@ -6,9 +6,9 @@ import pickle
|
||||
from modules.log import *
|
||||
|
||||
vpStartingCash = 20
|
||||
from modules.settings import vpTracker
|
||||
# Define the Card class
|
||||
class CardVP:
|
||||
global vpTracker
|
||||
|
||||
card_values = { # value of the ace is high until it needs to be low
|
||||
2: 2,
|
||||
@@ -296,154 +296,159 @@ def loadHSVp():
|
||||
return 0
|
||||
|
||||
def playVideoPoker(nodeID, message):
|
||||
global vpTracker, vpStartingCash
|
||||
msg = ""
|
||||
try:
|
||||
# Initialize the player
|
||||
if getLastCmdVp(nodeID) is None or getLastCmdVp(nodeID) == "":
|
||||
# create new player if not in tracker
|
||||
logger.debug(f"System: VideoPoker: New Player {nodeID}")
|
||||
vpTracker.append({'nodeID': nodeID, 'cmd': 'new', 'time': time.time(), 'cash': vpStartingCash, 'player': None, 'deck': None, 'highScore': 0, 'drawCount': 0})
|
||||
return f"You have {vpStartingCash} coins, \nWhats your bet?"
|
||||
|
||||
# Gather the player's bet
|
||||
if getLastCmdVp(nodeID) == "new" or getLastCmdVp(nodeID) == "gameOver":
|
||||
# Initialize shuffled Deck and Player
|
||||
player = PlayerVP()
|
||||
deck = DeckVP()
|
||||
deck.shuffle()
|
||||
drawCount = 1
|
||||
bet = 0
|
||||
msg = ''
|
||||
|
||||
# Initialize the player
|
||||
if getLastCmdVp(nodeID) is None or getLastCmdVp(nodeID=nodeID) == "":
|
||||
# create new player if not in tracker
|
||||
logger.debug(f"System: VideoPoker: New Player {nodeID}")
|
||||
vpTracker.append({'nodeID': nodeID, 'cmd': 'new', 'time': time.time(), 'cash': vpStartingCash, 'player': None, 'deck': None, 'highScore': 0, 'drawCount': 0})
|
||||
return f"You have {vpStartingCash} coins, \nWhats your bet?"
|
||||
|
||||
# Gather the player's bet
|
||||
if getLastCmdVp(nodeID) == "new" or getLastCmdVp(nodeID) == "gameOver":
|
||||
# Initialize shuffled Deck and Player
|
||||
player = PlayerVP()
|
||||
deck = DeckVP()
|
||||
deck.shuffle()
|
||||
drawCount = 1
|
||||
bet = 0
|
||||
msg = ''
|
||||
|
||||
# load the player bankroll from tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
player.bankroll = vpTracker[i]['cash']
|
||||
vpTracker[i]['time'] = time.time()
|
||||
|
||||
# Detect if message is a bet
|
||||
try:
|
||||
bet = int(message)
|
||||
except ValueError:
|
||||
msg += f"Please enter a valid bet, 1 to 5 coins. you have {player.bankroll} coins."
|
||||
|
||||
# Check if bet is valid
|
||||
if bet > player.bankroll:
|
||||
msg += f"You can only bet the money you have. {player.bankroll} coins, No strip poker here..."
|
||||
elif bet < 1:
|
||||
msg += "You must bet at least 1 coin.🪙"
|
||||
elif bet > 5:
|
||||
msg += "The 🎰 coin slot only fits 5 coins max."
|
||||
|
||||
# if msg contains an error, return it
|
||||
if msg is not None and msg != '':
|
||||
return msg
|
||||
else:
|
||||
# Take the bet
|
||||
player.bet(str(message))
|
||||
# Bet placed, start the game
|
||||
setLastCmdVp(nodeID, "playing")
|
||||
|
||||
# save player and deck to tracker
|
||||
# load the player bankroll from tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = player
|
||||
vpTracker[i]['deck'] = deck
|
||||
vpTracker[i]['cash'] = player.bankroll
|
||||
player.bankroll = vpTracker[i]['cash']
|
||||
vpTracker[i]['time'] = time.time()
|
||||
|
||||
# Play the game
|
||||
if getLastCmdVp(nodeID) == "playing":
|
||||
msg = ''
|
||||
|
||||
player.draw_cards(deck)
|
||||
msg += player.show_hand()
|
||||
# give hint to player
|
||||
msg += player.score_hand(resetHand=False)
|
||||
|
||||
# save player and deck to tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = player
|
||||
vpTracker[i]['deck'] = deck
|
||||
vpTracker[i]['drawCount'] = drawCount
|
||||
# Detect if message is a bet
|
||||
try:
|
||||
bet = int(message)
|
||||
except ValueError:
|
||||
msg += f"Please enter a valid bet, 1 to 5 coins. you have {player.bankroll} coins."
|
||||
|
||||
msg += f"\nDeal new card? \nex: 1,3,4 or (N)o,(A)ll (H)and"
|
||||
setLastCmdVp(nodeID, "redraw")
|
||||
return msg
|
||||
|
||||
if getLastCmdVp(nodeID) == "redraw":
|
||||
msg = ''
|
||||
# load the player and deck from tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
player = vpTracker[i]['player']
|
||||
deck = vpTracker[i]['deck']
|
||||
drawCount = vpTracker[i]['drawCount']
|
||||
# Check if bet is valid
|
||||
if bet > player.bankroll:
|
||||
msg += f"You can only bet the money you have. {player.bankroll} coins, No strip poker here..."
|
||||
elif bet < 1:
|
||||
msg += "You must bet at least 1 coin.🪙"
|
||||
elif bet > 5:
|
||||
msg += "The 🎰 coin slot only fits 5 coins max."
|
||||
|
||||
# if msg contains an error, return it
|
||||
if msg is not None and msg != '':
|
||||
return msg
|
||||
else:
|
||||
# Take the bet
|
||||
player.bet(str(message))
|
||||
# Bet placed, start the game
|
||||
setLastCmdVp(nodeID, "playing")
|
||||
|
||||
# if player wants to redraw cards, and not done already
|
||||
if message.lower().startswith("n"):
|
||||
setLastCmdVp(nodeID, "endGame")
|
||||
if message.lower().startswith("h"):
|
||||
msg = player.show_hand()
|
||||
return msg
|
||||
else:
|
||||
if drawCount <= 1:
|
||||
msg = player.redraw(deck, message)
|
||||
if msg.startswith("ex:"):
|
||||
# if returned error message, return it
|
||||
return msg
|
||||
drawCount += 1
|
||||
# save player and deck to tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = player
|
||||
vpTracker[i]['deck'] = deck
|
||||
vpTracker[i]['drawCount'] = drawCount
|
||||
if drawCount == 2:
|
||||
# this is the last draw will carry on to endGame for scoring
|
||||
msg = player.redraw(deck, message) + f"\n"
|
||||
vpTracker[i]['cash'] = player.bankroll
|
||||
|
||||
# Play the game
|
||||
if getLastCmdVp(nodeID) == "playing":
|
||||
msg = ''
|
||||
|
||||
player.draw_cards(deck)
|
||||
msg += player.show_hand()
|
||||
# give hint to player
|
||||
msg += player.score_hand(resetHand=False)
|
||||
|
||||
# save player and deck to tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = player
|
||||
vpTracker[i]['deck'] = deck
|
||||
vpTracker[i]['drawCount'] = drawCount
|
||||
|
||||
msg += f"\nDeal new card? \nex: 1,3,4 or (N)o,(A)ll (H)and"
|
||||
setLastCmdVp(nodeID, "redraw")
|
||||
return msg
|
||||
|
||||
if getLastCmdVp(nodeID) == "redraw":
|
||||
msg = ''
|
||||
# load the player and deck from tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
player = vpTracker[i]['player']
|
||||
deck = vpTracker[i]['deck']
|
||||
drawCount = vpTracker[i]['drawCount']
|
||||
|
||||
# if player wants to redraw cards, and not done already
|
||||
if message.lower().startswith("n"):
|
||||
setLastCmdVp(nodeID, "endGame")
|
||||
if message.lower().startswith("h"):
|
||||
msg = player.show_hand()
|
||||
return msg
|
||||
else:
|
||||
if drawCount <= 1:
|
||||
msg = player.redraw(deck, message)
|
||||
if msg.startswith("ex:"):
|
||||
# if returned error message, return it
|
||||
return msg
|
||||
# redraw done
|
||||
setLastCmdVp(nodeID, "endGame")
|
||||
drawCount += 1
|
||||
# save player and deck to tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = player
|
||||
vpTracker[i]['deck'] = deck
|
||||
vpTracker[i]['drawCount'] = drawCount
|
||||
if drawCount == 2:
|
||||
# this is the last draw will carry on to endGame for scoring
|
||||
msg = player.redraw(deck, message) + f"\n"
|
||||
if msg.startswith("ex:"):
|
||||
# if returned error message, return it
|
||||
return msg
|
||||
# redraw done
|
||||
setLastCmdVp(nodeID, "endGame")
|
||||
else:
|
||||
# show redrawn hand
|
||||
return msg
|
||||
else:
|
||||
# show redrawn hand
|
||||
return msg
|
||||
else:
|
||||
# redraw already done
|
||||
setLastCmdVp(nodeID, "endGame")
|
||||
|
||||
if getLastCmdVp(nodeID) == "endGame":
|
||||
# load the player and deck from tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
player = vpTracker[i]['player']
|
||||
deck = vpTracker[i]['deck']
|
||||
# redraw already done
|
||||
setLastCmdVp(nodeID, "endGame")
|
||||
|
||||
if getLastCmdVp(nodeID) == "endGame":
|
||||
# load the player and deck from tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
player = vpTracker[i]['player']
|
||||
deck = vpTracker[i]['deck']
|
||||
|
||||
msg += player.score_hand()
|
||||
msg += player.score_hand()
|
||||
|
||||
if player.bankroll < 1:
|
||||
player.bankroll = vpStartingCash
|
||||
msg += f"\nLooks 💸 like you're out of money. 💳 resetting ballance 🏧"
|
||||
elif player.bankroll > vpTracker[i]['highScore']:
|
||||
vpTracker[i]['highScore'] = player.bankroll
|
||||
msg += " 🎉HighScore!"
|
||||
# save high score
|
||||
saveHSVp(nodeID, vpTracker[i]['highScore'])
|
||||
if player.bankroll < 1:
|
||||
player.bankroll = vpStartingCash
|
||||
msg += f"\nLooks 💸 like you're out of money. 💳 resetting ballance 🏧"
|
||||
elif player.bankroll > vpTracker[i]['highScore']:
|
||||
vpTracker[i]['highScore'] = player.bankroll
|
||||
msg += " 🎉HighScore!"
|
||||
# save high score
|
||||
saveHSVp(nodeID, vpTracker[i]['highScore'])
|
||||
|
||||
msg += f"\nPlace your Bet, or (L)eave Table."
|
||||
msg += f"\nPlace your Bet, or (L)eave Table."
|
||||
|
||||
setLastCmdVp(nodeID, "gameOver")
|
||||
# reset player and deck in tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = None
|
||||
vpTracker[i]['deck'] = None
|
||||
vpTracker[i]['drawCount'] = 0
|
||||
# save bankroll
|
||||
vpTracker[i]['cash'] = player.bankroll
|
||||
setLastCmdVp(nodeID, "gameOver")
|
||||
# reset player and deck in tracker
|
||||
for i in range(len(vpTracker)):
|
||||
if vpTracker[i]['nodeID'] == nodeID:
|
||||
vpTracker[i]['player'] = None
|
||||
vpTracker[i]['deck'] = None
|
||||
vpTracker[i]['drawCount'] = 0
|
||||
# save bankroll
|
||||
vpTracker[i]['cash'] = player.bankroll
|
||||
|
||||
return msg
|
||||
return msg
|
||||
# At the end of the try block, if nothing returned yet:
|
||||
return msg if msg else 'No action taken.'
|
||||
except Exception as e:
|
||||
logger.warning(f"System: VideoPoker: Error {e}")
|
||||
return 'No Game in progress'
|
||||
|
||||
|
||||
@@ -178,16 +178,21 @@ def get_google_context(input, num_results):
|
||||
|
||||
def send_ollama_query(llmQuery):
|
||||
# Send the query to the Ollama API and return the response
|
||||
result = requests.post(ollamaAPI, data=json.dumps(llmQuery))
|
||||
if result.status_code == 200:
|
||||
result_json = result.json()
|
||||
result = result_json.get("response", "")
|
||||
# deepseek has added <think> </think> tags to the response
|
||||
if "<think>" in result:
|
||||
result = result.split("</think>")[1]
|
||||
else:
|
||||
raise Exception(f"HTTP Error: {result.status_code}")
|
||||
return result
|
||||
try:
|
||||
result = requests.post(ollamaAPI, data=json.dumps(llmQuery), timeout=5)
|
||||
if result.status_code == 200:
|
||||
result_json = result.json()
|
||||
result = result_json.get("response", "")
|
||||
# deepseek has added <think> </think> tags to the response
|
||||
if "<think>" in result:
|
||||
result = result.split("</think>")[1]
|
||||
else:
|
||||
logger.warning(f"System: LLM Query: Ollama API returned status code {result.status_code}")
|
||||
return f"⛔️ Request Error"
|
||||
return result
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"System: LLM Query: Ollama API request failed: {e}")
|
||||
return f"⛔️ Request Error"
|
||||
|
||||
def send_ollama_tooling_query(prompt, functions, model=None, max_tokens=450):
|
||||
"""
|
||||
|
||||
@@ -752,10 +752,10 @@ def get_nws_marine(zone, days=3):
|
||||
try:
|
||||
marine_pz_data = requests.get(zone, timeout=urlTimeoutSeconds)
|
||||
if not marine_pz_data.ok:
|
||||
logger.warning("Location:Error fetching NWS Marine PZ data")
|
||||
logger.warning(f"Location:Error fetching NWS Marine data (HTTP {marine_pz_data.status_code})")
|
||||
return ERROR_FETCHING_DATA
|
||||
except (requests.exceptions.RequestException):
|
||||
logger.warning("Location:Error fetching NWS Marine PZ data")
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Location:Error fetching NWS Marine data: {e}")
|
||||
return ERROR_FETCHING_DATA
|
||||
|
||||
marine_pz_data = marine_pz_data.text
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
# Module to respomnd to new nodes we havent seen before with a hello message
|
||||
# K7MHI Kelly Keeton 2024
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
from modules.log import *
|
||||
|
||||
def initalize_qrz_database():
|
||||
# create the database
|
||||
conn = sqlite3.connect(qrz_db)
|
||||
c = conn.cursor()
|
||||
# Check if the qrz table exists, and create it if it doesn't
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS qrz
|
||||
(qrz_id INTEGER PRIMARY KEY, qrz_call TEXT, qrz_name TEXT, qrz_qth TEXT, qrz_notes TEXT)''')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
try:
|
||||
# If the database file doesn't exist, it will be created by sqlite3.connect
|
||||
if not os.path.exists(qrz_db):
|
||||
logger.info(f"QRZ database file '{qrz_db}' not found. Creating new database.")
|
||||
conn = sqlite3.connect(qrz_db)
|
||||
c = conn.cursor()
|
||||
# Create the table if it doesn't exist
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS qrz
|
||||
(qrz_id INTEGER PRIMARY KEY, qrz_call TEXT, qrz_name TEXT, qrz_qth TEXT, qrz_notes TEXT)''')
|
||||
conn.commit()
|
||||
return True
|
||||
except sqlite3.Error as e:
|
||||
logger.error(f"Error initializing QRZ database: {e}")
|
||||
return False
|
||||
finally:
|
||||
if 'conn' in locals():
|
||||
conn.close()
|
||||
|
||||
def never_seen_before(nodeID):
|
||||
# check if we have seen this node before and sent a hello message
|
||||
|
||||
@@ -100,6 +100,9 @@ def get_freq_common_name(freq):
|
||||
|
||||
def get_hamlib(msg="f"):
|
||||
# get data from rigctld server
|
||||
if "socket" not in globals():
|
||||
logger.warning("RadioMon: 'socket' module not imported. Hamlib disabled.")
|
||||
return ERROR_FETCHING_DATA
|
||||
try:
|
||||
rigControlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
rigControlSocket.settimeout(2)
|
||||
|
||||
@@ -6,6 +6,9 @@ import html
|
||||
from html.parser import HTMLParser
|
||||
import bs4 as bs
|
||||
|
||||
# Common User-Agent for all RSS requests
|
||||
COMMON_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
|
||||
|
||||
class MLStripper(HTMLParser):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
@@ -54,7 +57,7 @@ def get_rss_feed(msg):
|
||||
|
||||
try:
|
||||
logger.debug(f"Fetching RSS feed from {feed_url} from message '{msg}'")
|
||||
agent = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
|
||||
agent = {'User-Agent': COMMON_USER_AGENT}
|
||||
request = urllib.request.Request(feed_url, headers=agent)
|
||||
with urllib.request.urlopen(request, timeout=urlTimeoutSeconds) as response:
|
||||
xml_data = response.read()
|
||||
@@ -63,7 +66,11 @@ def get_rss_feed(msg):
|
||||
items = root.findall('.//item')
|
||||
ns = None
|
||||
if not items:
|
||||
# Try to find the namespace dynamically
|
||||
# Try Atom <entry> elements (Reddit, etc.)
|
||||
items = root.findall('.//{http://www.w3.org/2005/Atom}entry')
|
||||
ns = 'http://www.w3.org/2005/Atom'
|
||||
if not items:
|
||||
# Try to find the namespace dynamically for RSS
|
||||
for elem in root.iter():
|
||||
if elem.tag.endswith('item'):
|
||||
ns_uri = elem.tag.split('}')[0].strip('{')
|
||||
@@ -72,22 +79,30 @@ def get_rss_feed(msg):
|
||||
break
|
||||
items = items[:RSS_RETURN_COUNT]
|
||||
if not items:
|
||||
return "No RSS feed entries found."
|
||||
logger.debug(f"No RSS or Atom feed entries found in feed xml_data: {xml_data[:500]}...")
|
||||
return "No RSS or Atom feed entries found."
|
||||
formatted_entries = []
|
||||
for item in items:
|
||||
if ns:
|
||||
title = item.findtext(f'{{{ns}}}title', default='No title')
|
||||
link = item.findtext(f'{{{ns}}}link', default=None)
|
||||
description = item.findtext(f'{{{ns}}}description', default='No description')
|
||||
pub_date = item.findtext(f'{{{ns}}}pubDate', default='No date')
|
||||
if ns == 'http://www.w3.org/2005/Atom':
|
||||
# Atom feed
|
||||
title = item.findtext('{http://www.w3.org/2005/Atom}title', default='No title')
|
||||
# Atom links are in <link href="..."/>
|
||||
link_elem = item.find('{http://www.w3.org/2005/Atom}link')
|
||||
link = link_elem.attrib.get('href') if link_elem is not None else None
|
||||
# Atom content or summary
|
||||
description = item.findtext('{http://www.w3.org/2005/Atom}content')
|
||||
if not description:
|
||||
description = item.findtext('{http://www.w3.org/2005/Atom}summary', default='No description')
|
||||
pub_date = item.findtext('{http://www.w3.org/2005/Atom}updated', default='No date')
|
||||
else:
|
||||
# RSS feed
|
||||
title = item.findtext('title', default='No title')
|
||||
link = item.findtext('link', default=None)
|
||||
description = item.findtext('description', default='No description')
|
||||
pub_date = item.findtext('pubDate', default='No date')
|
||||
|
||||
# Unescape HTML entities and strip tags
|
||||
description = html.unescape(description)
|
||||
description = html.unescape(description) if description else ""
|
||||
description = strip_tags(description)
|
||||
if len(description) > RSS_TRIM_LENGTH:
|
||||
description = description[:RSS_TRIM_LENGTH - 3] + "..."
|
||||
|
||||
@@ -150,8 +150,8 @@ if dad_jokes_enabled:
|
||||
# Wikipedia Search Configuration
|
||||
if wikipedia_enabled:
|
||||
from modules.wiki import * # from the spudgunman/meshing-around repo
|
||||
trap_list = trap_list + ("wiki:",)
|
||||
help_message = help_message + ", wiki:"
|
||||
trap_list = trap_list + ("wiki",)
|
||||
help_message = help_message + ", wiki"
|
||||
|
||||
# RSS Feed Configuration
|
||||
if rssEnable:
|
||||
|
||||
402
modules/test_bot.py
Normal file
402
modules/test_bot.py
Normal file
@@ -0,0 +1,402 @@
|
||||
# test_bot.py
|
||||
# Unit tests for various modules in the meshing-around project
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
import importlib
|
||||
import pkgutil
|
||||
import warnings
|
||||
# Suppress ResourceWarning warnings for asyncio unclosed event here
|
||||
warnings.filterwarnings("ignore", category=ResourceWarning)
|
||||
|
||||
|
||||
# Add the parent directory to sys.path to allow module imports
|
||||
parent_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
sys.path.append(parent_path)
|
||||
modules_path = os.path.join(parent_path, 'modules')
|
||||
|
||||
# Limits API calls during testing
|
||||
CHECKALL = False
|
||||
# Check for a file named .checkall in the parent directory
|
||||
checkall_path = os.path.join(parent_path, '.checkall')
|
||||
if os.path.isfile(checkall_path):
|
||||
CHECKALL = True
|
||||
|
||||
|
||||
# List of module names to exclude
|
||||
exclude = ['test_bot','udp', 'system', 'log', 'gpio', 'web',]
|
||||
available_modules = [
|
||||
m.name for m in pkgutil.iter_modules([modules_path])
|
||||
if m.name not in exclude]
|
||||
|
||||
try:
|
||||
print("\nImporting Core Modules:")
|
||||
from modules.log import *
|
||||
print(" ✔ Imported 'log'")
|
||||
# Set location default
|
||||
lat = latitudeValue
|
||||
lon = longitudeValue
|
||||
print(f" ✔ Location set to Latitude: {lat}, Longitude: {lon}")
|
||||
from modules.system import *
|
||||
print(" ✔ Imported 'system'")
|
||||
|
||||
print("\nImporting non-excluded modules:")
|
||||
for module_name in [m.name for m in pkgutil.iter_modules([modules_path])]:
|
||||
if module_name not in exclude:
|
||||
importlib.import_module(module_name)
|
||||
print(f" ✔ Imported '{module_name}'")
|
||||
except Exception as e:
|
||||
print(f"\nError importing modules: {e}")
|
||||
print("Run this program from the main program directory: python3 script/test_bot.py")
|
||||
exit(1)
|
||||
|
||||
class TestBot(unittest.TestCase):
|
||||
def test_example(self):
|
||||
# Example test case
|
||||
self.assertEqual(1 + 1, 2)
|
||||
|
||||
def test_load_bbsdb(self):
|
||||
from bbstools import load_bbsdb
|
||||
test_load = load_bbsdb()
|
||||
self.assertTrue(test_load)
|
||||
|
||||
def test_bbs_list_messages(self):
|
||||
from bbstools import bbs_list_messages
|
||||
messages = bbs_list_messages()
|
||||
print("list_messages() returned:", messages)
|
||||
self.assertIsInstance(messages, str)
|
||||
|
||||
def test_initialize_checklist_database(self):
|
||||
from checklist import initialize_checklist_database, process_checklist_command
|
||||
result = initialize_checklist_database()
|
||||
result1 = process_checklist_command(0, 'checklist', name="none", location="none")
|
||||
self.assertTrue(result)
|
||||
self.assertIsInstance(result1, str)
|
||||
|
||||
def test_init_news_sources(self):
|
||||
from filemon import initNewsSources
|
||||
result = initNewsSources()
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_get_nina_alerts(self):
|
||||
from globalalert import get_nina_alerts
|
||||
alerts = get_nina_alerts()
|
||||
self.assertIsInstance(alerts, str)
|
||||
|
||||
def test_llmTool_get_google(self):
|
||||
from llm import llmTool_get_google
|
||||
result = llmTool_get_google("What is 2+2?", 1)
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
def test_send_ollama_query(self):
|
||||
from llm import send_ollama_query
|
||||
response = send_ollama_query("Hello, Ollama!")
|
||||
self.assertIsInstance(response, str)
|
||||
|
||||
def test_get_moon_phase(self):
|
||||
from space import get_moon
|
||||
phase = get_moon(lat, lon)
|
||||
self.assertIsInstance(phase, str)
|
||||
|
||||
def test_get_sun_times(self):
|
||||
from space import get_sun
|
||||
sun_times = get_sun(lat, lon)
|
||||
self.assertIsInstance(sun_times, str)
|
||||
|
||||
def test_hf_band_conditions(self):
|
||||
from space import hf_band_conditions
|
||||
conditions = hf_band_conditions()
|
||||
self.assertIsInstance(conditions, str)
|
||||
|
||||
def test_get_wikipedia_summary(self):
|
||||
from wiki import get_wikipedia_summary
|
||||
summary = get_wikipedia_summary("Python", location=(lat, lon))
|
||||
self.assertIsInstance(summary, str)
|
||||
|
||||
def test_get_kiwix_summary(self):
|
||||
from wiki import get_kiwix_summary
|
||||
summary = get_kiwix_summary("Python")
|
||||
self.assertIsInstance(summary, str)
|
||||
|
||||
def get_openskynetwork(self):
|
||||
from locationdata import get_openskynetwork
|
||||
flights = get_openskynetwork(lat, lon)
|
||||
self.assertIsInstance(flights, str)
|
||||
|
||||
def test_initalize_qrz_database(self):
|
||||
from qrz import initalize_qrz_database
|
||||
result = initalize_qrz_database()
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_get_hamlib(self):
|
||||
from radio import get_hamlib
|
||||
frequency = get_hamlib('f')
|
||||
self.assertIsInstance(frequency, str)
|
||||
|
||||
def test_get_rss_feed(self):
|
||||
from rss import get_rss_feed
|
||||
result = get_rss_feed('')
|
||||
self.assertIsInstance(result, str)
|
||||
|
||||
|
||||
##### GAMES Tests #####
|
||||
|
||||
|
||||
|
||||
def test_tictactoe_initial_and_move(self):
|
||||
from games.tictactoe import tictactoe
|
||||
user_id = "testuser"
|
||||
# Start a new game (no move yet)
|
||||
initial = tictactoe.play(user_id, "")
|
||||
print("Initial response:", initial)
|
||||
# Make a move, e.g., '1'
|
||||
second = tictactoe.play(user_id, "1")
|
||||
print("After move '1':", second)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(second, str)
|
||||
|
||||
|
||||
def test_playVideoPoker(self):
|
||||
from games.videopoker import playVideoPoker
|
||||
user_id = "testuser"
|
||||
# Start a new game/session
|
||||
initial = playVideoPoker(user_id, 'deal')
|
||||
print("Initial response:", initial)
|
||||
# Place a 5-coin bet
|
||||
after_bet = playVideoPoker(user_id, '5')
|
||||
print("After placing 5-coin bet:", after_bet)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(after_bet, str)
|
||||
|
||||
|
||||
def test_play_blackjack(self):
|
||||
from games.blackjack import playBlackJack
|
||||
user_id = "testuser"
|
||||
# Start a new game/session
|
||||
initial = playBlackJack(user_id, 'deal')
|
||||
print("Initial response:", initial)
|
||||
# Place a 5-chip bet
|
||||
after_bet = playBlackJack(user_id, '5')
|
||||
print("After placing 5-chip bet:", after_bet)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(after_bet, str)
|
||||
|
||||
|
||||
def test_hangman_initial_and_guess(self):
|
||||
from games.hangman import hangman
|
||||
user_id = "testuser"
|
||||
# Start a new game (no guess yet)
|
||||
initial = hangman.play(user_id, "")
|
||||
print("Initial response:", initial)
|
||||
# Guess a letter, e.g., 'e'
|
||||
second = hangman.play(user_id, "e")
|
||||
print("After guessing 'e':", second)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(second, str)
|
||||
|
||||
|
||||
def test_play_lemonade_stand(self):
|
||||
from games.lemonade import playLemonstand, lemonadeTracker
|
||||
user_id = "testuser"
|
||||
# Ensure user is in tracker
|
||||
if not any(u['nodeID'] == user_id for u in lemonadeTracker):
|
||||
lemonadeTracker.append({'nodeID': user_id, 'cups': 0, 'lemons': 0, 'sugar': 0, 'cash': 30.0, 'start': 30.0, 'cmd': 'new', 'last_played': 0})
|
||||
# Start a new game
|
||||
initial = playLemonstand(user_id, "", newgame=True)
|
||||
print("Initial response:", initial)
|
||||
# Buy 1 box of cups
|
||||
after_cups = playLemonstand(user_id, "1")
|
||||
print("After buying 1 box of cups:", after_cups)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(after_cups, str)
|
||||
|
||||
|
||||
def test_play_golfsim_one_hole(self):
|
||||
from games.golfsim import playGolf
|
||||
user_id = "testuser"
|
||||
# Start a new game/hole
|
||||
initial = playGolf(user_id, "", last_cmd="new")
|
||||
print("Initial hole info:", initial)
|
||||
# Take first shot with driver
|
||||
after_shot = playGolf(user_id, "driver")
|
||||
print("After hitting driver:", after_shot)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(after_shot, str)
|
||||
|
||||
|
||||
def test_play_dopewar_choose_city_and_list(self):
|
||||
from games.dopewar import playDopeWars
|
||||
user_id = 1234567899 # Use a unique test user ID
|
||||
# Start a new game, get city selection prompt
|
||||
initial = playDopeWars(user_id, "")
|
||||
print("Initial city selection:", initial)
|
||||
# Choose city 1
|
||||
after_city = playDopeWars(user_id, "1")
|
||||
print("After choosing city 1 (main game list):", after_city)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(after_city, str)
|
||||
|
||||
|
||||
def test_play_mastermind_one_guess(self):
|
||||
from games.mmind import start_mMind
|
||||
user_id = 1234567899 # Use a unique test user ID
|
||||
# Start a new game (should prompt for difficulty/colors)
|
||||
initial = start_mMind(user_id, "n")
|
||||
print("Initial response (difficulty/colors):", initial)
|
||||
# Make a guess (e.g., "RGBY" - valid for normal)
|
||||
after_guess = start_mMind(user_id, "RGBY")
|
||||
print("After guessing RGBY:", after_guess)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(after_guess, str)
|
||||
|
||||
|
||||
def test_quiz_game_answer_one_and_end(self):
|
||||
from games.quiz import quizGamePlayer
|
||||
quizmaster_id = "admin" # Use a valid quizmaster ID from bbs_admin_list
|
||||
user_id = "testuser"
|
||||
# Start the quiz as quizmaster
|
||||
start_msg = quizGamePlayer.start_game(quizmaster_id)
|
||||
print("Quiz start:", start_msg)
|
||||
# User joins the quiz
|
||||
join_msg = quizGamePlayer.join(user_id)
|
||||
print("User joined:", join_msg)
|
||||
# Get the first question (should be included in join_msg, but call explicitly for clarity)
|
||||
question_msg = quizGamePlayer.next_question(user_id)
|
||||
print("First question:", question_msg)
|
||||
# Simulate answering with 'A' (adjust if your first question expects a different answer)
|
||||
answer_msg = quizGamePlayer.answer(user_id, "A")
|
||||
print("Answer response:", answer_msg)
|
||||
# End the quiz as quizmaster
|
||||
end_msg = quizGamePlayer.stop_game(quizmaster_id)
|
||||
print("Quiz end:", end_msg)
|
||||
self.assertIsInstance(start_msg, str)
|
||||
self.assertIsInstance(join_msg, str)
|
||||
self.assertIsInstance(question_msg, str)
|
||||
self.assertIsInstance(answer_msg, str)
|
||||
self.assertIsInstance(end_msg, str)
|
||||
|
||||
def test_survey_answer_one_and_end(self):
|
||||
from survey import survey_module
|
||||
user_id = "testuser"
|
||||
survey_name = "example" # Make sure this survey exists in your data/surveys directory
|
||||
|
||||
# Start the survey
|
||||
start_msg = survey_module.start_survey(user_id, survey_name)
|
||||
print("Survey start:", start_msg)
|
||||
# Answer the first question with 'A' (adjust if your survey expects a different type)
|
||||
answer_msg = survey_module.answer(user_id, "A")
|
||||
print("Answer response:", answer_msg)
|
||||
# End the survey
|
||||
end_msg = survey_module.end_survey(user_id)
|
||||
print("Survey end:", end_msg)
|
||||
|
||||
self.assertIsInstance(start_msg, str)
|
||||
self.assertIsInstance(answer_msg, str)
|
||||
self.assertIsInstance(end_msg, str)
|
||||
|
||||
|
||||
|
||||
def test_hamtest_answer_one(self):
|
||||
from games.hamtest import hamtest
|
||||
user_id = "testuser"
|
||||
# Start a new ham test game (default level: technician)
|
||||
initial = hamtest.newGame(user_id)
|
||||
print("Initial question:", initial)
|
||||
# Answer the first question with 'A'
|
||||
answer_msg = hamtest.answer(user_id, "A")
|
||||
print("Answer response:", answer_msg)
|
||||
self.assertIsInstance(initial, str)
|
||||
self.assertIsInstance(answer_msg, str)
|
||||
|
||||
|
||||
|
||||
##### API Tests - Extended tests run only if CHECKALL is True #####
|
||||
|
||||
|
||||
if CHECKALL:
|
||||
logger.info("Running extended API tests as CHECKALL is enabled.")
|
||||
def test_getRepeaterBook(self):
|
||||
from locationdata import getRepeaterBook
|
||||
repeaters = getRepeaterBook(lat, lon)
|
||||
self.assertIsInstance(repeaters, str)
|
||||
|
||||
def test_getArtSciRepeaters(self):
|
||||
from locationdata import getArtSciRepeaters
|
||||
repeaters = getArtSciRepeaters(lat, lon)
|
||||
self.assertIsInstance(repeaters, str)
|
||||
|
||||
def test_get_NOAAtides(self):
|
||||
from locationdata import get_NOAAtide
|
||||
tides = get_NOAAtide(lat, lon)
|
||||
self.assertIsInstance(tides, str)
|
||||
|
||||
def test_get_NOAAweather(self):
|
||||
from locationdata import get_NOAAweather
|
||||
weather = get_NOAAweather(lat, lon)
|
||||
self.assertIsInstance(weather, str)
|
||||
|
||||
def test_where_am_i(self):
|
||||
from locationdata import where_am_i
|
||||
location = where_am_i(lat, lon)
|
||||
self.assertIsInstance(location, str)
|
||||
|
||||
def test_getWeatherAlertsNOAA(self):
|
||||
from locationdata import getWeatherAlertsNOAA
|
||||
alerts = getWeatherAlertsNOAA(lat, lon)
|
||||
if isinstance(alerts, tuple):
|
||||
self.assertIsInstance(alerts[0], str)
|
||||
else:
|
||||
self.assertIsInstance(alerts, str)
|
||||
|
||||
def test_getActiveWeatherAlertsDetailNOAA(self):
|
||||
from locationdata import getActiveWeatherAlertsDetailNOAA
|
||||
alerts_detail = getActiveWeatherAlertsDetailNOAA(lat, lon)
|
||||
self.assertIsInstance(alerts_detail, str)
|
||||
|
||||
def test_getIpawsAlerts(self):
|
||||
from locationdata import getIpawsAlert
|
||||
alerts = getIpawsAlert(lat, lon)
|
||||
self.assertIsInstance(alerts, str)
|
||||
|
||||
def test_get_flood_noaa(self):
|
||||
from locationdata import get_flood_noaa
|
||||
flood_info = get_flood_noaa(lat, lon, 12484500) # Example gauge UID
|
||||
self.assertIsInstance(flood_info, str)
|
||||
|
||||
def test_get_volcano_usgs(self):
|
||||
from locationdata import get_volcano_usgs
|
||||
volcano_info = get_volcano_usgs(lat, lon)
|
||||
self.assertIsInstance(volcano_info, str)
|
||||
|
||||
def test_get_nws_marine_alerts(self):
|
||||
from locationdata import get_nws_marine
|
||||
marine_alerts = get_nws_marine('https://tgftp.nws.noaa.gov/data/forecasts/marine/coastal/pz/pzz135.txt',1) # Example zone
|
||||
self.assertIsInstance(marine_alerts, str)
|
||||
|
||||
def test_checkUSGSEarthQuakes(self):
|
||||
from locationdata import checkUSGSEarthQuake
|
||||
earthquakes = checkUSGSEarthQuake(lat, lon)
|
||||
self.assertIsInstance(earthquakes, str)
|
||||
|
||||
def test_getNextSatellitePass(self):
|
||||
from space import getNextSatellitePass
|
||||
pass_info = getNextSatellitePass('25544', lat, lon)
|
||||
self.assertIsInstance(pass_info, str)
|
||||
|
||||
def test_get_wx_meteo(self):
|
||||
from wx_meteo import get_wx_meteo
|
||||
weather_report = get_wx_meteo(lat, lon)
|
||||
self.assertIsInstance(weather_report, str)
|
||||
|
||||
def test_get_flood_openmeteo(self):
|
||||
from wx_meteo import get_flood_openmeteo
|
||||
flood_report = get_flood_openmeteo(lat, lon)
|
||||
self.assertIsInstance(flood_report, str)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if not CHECKALL:
|
||||
print("\nNote: Extended API tests are skipped. To enable them, create a file named '.checkall' in the parent directory.\n")
|
||||
unittest.main()
|
||||
244
modules/wiki.py
244
modules/wiki.py
@@ -1,121 +1,159 @@
|
||||
# meshbot wiki module
|
||||
|
||||
from modules.log import *
|
||||
import wikipedia # pip install wikipedia
|
||||
#import wikipedia # pip install wikipedia
|
||||
import requests
|
||||
import bs4 as bs
|
||||
from urllib.parse import quote
|
||||
|
||||
# Kiwix support for local wiki
|
||||
if use_kiwix_server:
|
||||
import requests
|
||||
import bs4 as bs
|
||||
from urllib.parse import quote
|
||||
def tag_visible(element):
|
||||
"""Filter visible text from HTML elements for Kiwix"""
|
||||
if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
|
||||
return False
|
||||
if isinstance(element, bs.element.Comment):
|
||||
return False
|
||||
return True
|
||||
|
||||
# Kiwix helper functions (only loaded if use_kiwix_server is True)
|
||||
if wikipedia_enabled and use_kiwix_server:
|
||||
def tag_visible(element):
|
||||
"""Filter visible text from HTML elements for Kiwix"""
|
||||
if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
|
||||
return False
|
||||
if isinstance(element, bs.element.Comment):
|
||||
return False
|
||||
return True
|
||||
def text_from_html(body):
|
||||
"""Extract visible text from HTML content"""
|
||||
soup = bs.BeautifulSoup(body, 'html.parser')
|
||||
texts = soup.find_all(string=True)
|
||||
visible_texts = filter(tag_visible, texts)
|
||||
return " ".join(t.strip() for t in visible_texts if t.strip())
|
||||
|
||||
def text_from_html(body):
|
||||
"""Extract visible text from HTML content"""
|
||||
soup = bs.BeautifulSoup(body, 'html.parser')
|
||||
texts = soup.find_all(string=True)
|
||||
visible_texts = filter(tag_visible, texts)
|
||||
return " ".join(t.strip() for t in visible_texts if t.strip())
|
||||
def get_kiwix_summary(search_term):
|
||||
"""Query local Kiwix server for Wikipedia article"""
|
||||
if search_term is None or search_term.strip() == "":
|
||||
return ERROR_FETCHING_DATA
|
||||
try:
|
||||
search_encoded = quote(search_term)
|
||||
# Try direct article access first
|
||||
wiki_article = search_encoded.capitalize().replace("%20", "_")
|
||||
exact_url = f"{kiwix_url}/raw/{kiwix_library_name}/content/A/{wiki_article}"
|
||||
|
||||
response = requests.get(exact_url, timeout=urlTimeoutSeconds)
|
||||
if response.status_code == 200:
|
||||
# Extract and clean text
|
||||
text = text_from_html(response.text)
|
||||
# Remove common Wikipedia metadata prefixes
|
||||
text = text.split("Jump to navigation", 1)[-1]
|
||||
text = text.split("Jump to search", 1)[-1]
|
||||
# Truncate to reasonable length (first few sentences)
|
||||
sentences = text.split('. ')
|
||||
summary = '. '.join(sentences[:wiki_return_limit])
|
||||
if summary and not summary.endswith('.'):
|
||||
summary += '.'
|
||||
return summary.strip()[:500] # Hard limit at 500 chars
|
||||
|
||||
def get_kiwix_summary(search_term):
|
||||
"""Query local Kiwix server for Wikipedia article"""
|
||||
try:
|
||||
search_encoded = quote(search_term)
|
||||
# Try direct article access first
|
||||
wiki_article = search_encoded.capitalize().replace("%20", "_")
|
||||
exact_url = f"{kiwix_url}/raw/{kiwix_library_name}/content/A/{wiki_article}"
|
||||
# If direct access fails, try search
|
||||
logger.debug(f"System: Kiwix direct article not found for:{search_term} Status Code:{response.status_code}")
|
||||
search_url = f"{kiwix_url}/search?content={kiwix_library_name}&pattern={search_encoded}"
|
||||
response = requests.get(search_url, timeout=urlTimeoutSeconds)
|
||||
|
||||
if response.status_code == 200 and "No results were found" not in response.text:
|
||||
soup = bs.BeautifulSoup(response.text, 'html.parser')
|
||||
links = [a['href'] for a in soup.find_all('a', href=True) if "start=" not in a['href']]
|
||||
|
||||
response = requests.get(exact_url, timeout=urlTimeoutSeconds)
|
||||
if response.status_code == 200:
|
||||
# Extract and clean text
|
||||
text = text_from_html(response.text)
|
||||
# Remove common Wikipedia metadata prefixes
|
||||
text = text.split("Jump to navigation", 1)[-1]
|
||||
text = text.split("Jump to search", 1)[-1]
|
||||
# Truncate to reasonable length (first few sentences)
|
||||
sentences = text.split('. ')
|
||||
summary = '. '.join(sentences[:wiki_return_limit])
|
||||
if summary and not summary.endswith('.'):
|
||||
summary += '.'
|
||||
return summary.strip()[:500] # Hard limit at 500 chars
|
||||
|
||||
# If direct access fails, try search
|
||||
search_url = f"{kiwix_url}/search?content={kiwix_library_name}&pattern={search_encoded}"
|
||||
response = requests.get(search_url, timeout=urlTimeoutSeconds)
|
||||
|
||||
if response.status_code == 200 and "No results were found" not in response.text:
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
links = [a['href'] for a in soup.find_all('a', href=True) if "start=" not in a['href']]
|
||||
|
||||
for link in links[:3]: # Check first 3 results
|
||||
article_name = link.split("/")[-1]
|
||||
if not article_name or article_name[0].islower():
|
||||
continue
|
||||
|
||||
article_url = f"{kiwix_url}{link}"
|
||||
article_response = requests.get(article_url, timeout=urlTimeoutSeconds)
|
||||
if article_response.status_code == 200:
|
||||
text = text_from_html(article_response.text)
|
||||
text = text.split("Jump to navigation", 1)[-1]
|
||||
text = text.split("Jump to search", 1)[-1]
|
||||
sentences = text.split('. ')
|
||||
summary = '. '.join(sentences[:wiki_return_limit])
|
||||
if summary and not summary.endswith('.'):
|
||||
summary += '.'
|
||||
return summary.strip()[:500]
|
||||
|
||||
logger.warning(f"System: No Kiwix Results for:{search_term}")
|
||||
# try to fall back to online Wikipedia if available
|
||||
return get_wikipedia_summary(search_term, force=True)
|
||||
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.warning(f"System: Kiwix connection error: {e}")
|
||||
return "Unable to connect to local wiki server"
|
||||
except Exception as e:
|
||||
logger.warning(f"System: Error with Kiwix for:{search_term} {e}")
|
||||
return ERROR_FETCHING_DATA
|
||||
for link in links[:3]: # Check first 3 results
|
||||
article_name = link.split("/")[-1]
|
||||
if not article_name or article_name[0].islower():
|
||||
continue
|
||||
|
||||
article_url = f"{kiwix_url}{link}"
|
||||
article_response = requests.get(article_url, timeout=urlTimeoutSeconds)
|
||||
if article_response.status_code == 200:
|
||||
text = text_from_html(article_response.text)
|
||||
text = text.split("Jump to navigation", 1)[-1]
|
||||
text = text.split("Jump to search", 1)[-1]
|
||||
sentences = text.split('. ')
|
||||
summary = '. '.join(sentences[:wiki_return_limit])
|
||||
if summary and not summary.endswith('.'):
|
||||
summary += '.'
|
||||
return summary.strip()[:500]
|
||||
|
||||
logger.warning(f"System: No Kiwix Results for:{search_term}")
|
||||
# try to fall back to online Wikipedia if available
|
||||
return get_wikipedia_summary(search_term, force=True)
|
||||
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.warning(f"System: Kiwix connection error: {e}")
|
||||
return "Unable to connect to local wiki server"
|
||||
# Fallback to online Wikipedia
|
||||
return get_wikipedia_summary(search_term, force=True)
|
||||
except Exception as e:
|
||||
logger.warning(f"System: Error with Kiwix for:{search_term} {e}")
|
||||
return ERROR_FETCHING_DATA
|
||||
|
||||
def get_wikipedia_summary(search_term, location=None, force=False):
|
||||
lat, lon = location if location else (None, None)
|
||||
# Use Kiwix if configured
|
||||
if use_kiwix_server and not force:
|
||||
return get_kiwix_summary(search_term)
|
||||
|
||||
|
||||
if not search_term or not search_term.strip():
|
||||
return ERROR_FETCHING_DATA
|
||||
|
||||
api_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{requests.utils.quote(search_term)}"
|
||||
headers = {
|
||||
"User-Agent": "MeshBot/1.0 (https://github.com/kkeeton/meshing-around; contact: youremail@example.com)"
|
||||
}
|
||||
try:
|
||||
# Otherwise use online Wikipedia
|
||||
wikipedia_search = wikipedia.search(search_term, results=3)
|
||||
wikipedia_suggest = wikipedia.suggest(search_term)
|
||||
#wikipedia_aroundme = wikipedia.geosearch(lat,lon, results=3)
|
||||
#logger.debug(f"System: Wikipedia Nearby:{wikipedia_aroundme}")
|
||||
response = requests.get(api_url, timeout=5, headers=headers)
|
||||
if response.status_code == 404:
|
||||
logger.warning(f"System: No Wikipedia Results for:{search_term}")
|
||||
return ERROR_FETCHING_DATA
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
# Check for error response from Wikipedia API
|
||||
if "extract" not in data or not data.get("extract"):
|
||||
logger.warning(f"System: Wikipedia API returned no extract for:{search_term} (data: {data})")
|
||||
return ERROR_FETCHING_DATA
|
||||
summary = data.get("extract")
|
||||
if not summary or not isinstance(summary, str) or not summary.strip():
|
||||
logger.warning(f"System: No summary found for:{search_term}")
|
||||
return ERROR_FETCHING_DATA
|
||||
sentences = [s for s in summary.split('. ') if s.strip()]
|
||||
if not sentences:
|
||||
logger.warning(f"System: Wikipedia summary split produced no sentences for:{search_term}")
|
||||
return ERROR_FETCHING_DATA
|
||||
summary = '. '.join(sentences[:wiki_return_limit])
|
||||
if summary and not summary.endswith('.'):
|
||||
summary += '.'
|
||||
return summary.strip()[:500]
|
||||
except Exception as e:
|
||||
logger.debug(f"System: Wikipedia search error for:{search_term} {e}")
|
||||
logger.warning(f"System: Wikipedia API error for:{search_term} {e}")
|
||||
return ERROR_FETCHING_DATA
|
||||
|
||||
# def get_wikipedia_summary(search_term, location=None, force=False):
|
||||
# lat, lon = location if location else (None, None)
|
||||
# # Use Kiwix if configured
|
||||
# if use_kiwix_server and not force:
|
||||
# return get_kiwix_summary(search_term)
|
||||
|
||||
if len(wikipedia_search) == 0:
|
||||
logger.warning(f"System: No Wikipedia Results for:{search_term}")
|
||||
return ERROR_FETCHING_DATA
|
||||
# try:
|
||||
# # Otherwise use online Wikipedia
|
||||
# wikipedia_search = wikipedia.search(search_term, results=3)
|
||||
# wikipedia_suggest = wikipedia.suggest(search_term)
|
||||
# #wikipedia_aroundme = wikipedia.geosearch(lat,lon, results=3)
|
||||
# #logger.debug(f"System: Wikipedia Nearby:{wikipedia_aroundme}")
|
||||
# except Exception as e:
|
||||
# logger.debug(f"System: Wikipedia search error for:{search_term} {e}")
|
||||
# return ERROR_FETCHING_DATA
|
||||
|
||||
try:
|
||||
logger.debug(f"System: Searching Wikipedia for:{search_term}, First Result:{wikipedia_search[0]}, Suggest Word:{wikipedia_suggest}")
|
||||
summary = wikipedia.summary(search_term, sentences=wiki_return_limit, auto_suggest=False, redirect=True)
|
||||
except wikipedia.DisambiguationError as e:
|
||||
logger.warning(f"System: Disambiguation Error for:{search_term} trying {wikipedia_search[0]}")
|
||||
summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
|
||||
except wikipedia.PageError as e:
|
||||
logger.warning(f"System: Wikipedia Page Error for:{search_term} {e} trying {wikipedia_search[0]}")
|
||||
summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
|
||||
except Exception as e:
|
||||
logger.warning(f"System: Error with Wikipedia for:{search_term} {e}")
|
||||
return ERROR_FETCHING_DATA
|
||||
# if len(wikipedia_search) == 0:
|
||||
# logger.warning(f"System: No Wikipedia Results for:{search_term}")
|
||||
# return ERROR_FETCHING_DATA
|
||||
|
||||
return summary
|
||||
# try:
|
||||
# logger.debug(f"System: Searching Wikipedia for:{search_term}, First Result:{wikipedia_search[0]}, Suggest Word:{wikipedia_suggest}")
|
||||
# summary = wikipedia.summary(search_term, sentences=wiki_return_limit, auto_suggest=False, redirect=True)
|
||||
# except wikipedia.DisambiguationError as e:
|
||||
# logger.warning(f"System: Disambiguation Error for:{search_term} trying {wikipedia_search[0]}")
|
||||
# summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
|
||||
# except wikipedia.PageError as e:
|
||||
# logger.warning(f"System: Wikipedia Page Error for:{search_term} {e} trying {wikipedia_search[0]}")
|
||||
# summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
|
||||
# except Exception as e:
|
||||
# logger.warning(f"System: Error with Wikipedia for:{search_term} {e}")
|
||||
# return ERROR_FETCHING_DATA
|
||||
|
||||
# return summary
|
||||
@@ -8,5 +8,4 @@ beautifulsoup4
|
||||
dadjokes
|
||||
geopy
|
||||
schedule
|
||||
wikipedia
|
||||
googlesearch-python
|
||||
|
||||
Reference in New Issue
Block a user