Survey Says!

is this cool?
This commit is contained in:
SpudGunMan
2025-10-07 22:39:08 -07:00
parent ddb9c8b4bf
commit 6c078b4d17
8 changed files with 268 additions and 6 deletions

3
.gitignore vendored
View File

@@ -27,3 +27,6 @@ data/qrz.db
news.txt
alert.txt
bee.txt
# .csv files
*.csv

View File

@@ -334,7 +334,16 @@ golfsim = True
hangman = True
hamtest = True
tictactoe = True
quiz = True
# enable or disable the quiz game module questions are in data/quiz.json
quiz = False
# enable or disable the survey game module questions are in data/survey/survey.json
survey = False
# Whether to record user ID in responses
surveyRecordID=True
# Whether to record location on start of survey
surveyRecordLocation=True
[messagingSettings]
# delay in seconds for response to avoid message collision /throttling

View File

@@ -0,0 +1,15 @@
[
{
"type": "multiple_choice",
"question": "How Did you hear about us?",
"options": ["Meshtastic", "Discord", "Friend", "Other"]
},
{
"type": "integer",
"question": "How many nodes do you own?"
},
{
"type": "text",
"question": "What feature would you like to see next?"
}
]

View File

@@ -0,0 +1,15 @@
[
{
"type": "multiple_choice",
"question": "How often do you experience snowfall in your area?",
"options": ["Never", "Rarely", "Sometimes", "Often", "Every winter"]
},
{
"type": "integer",
"question": "What was the deepest snowfall (in inches) you've measured at your location?"
},
{
"type": "text",
"question": "Describe any challenges you face during heavy snowfall."
}
]

View File

@@ -15,10 +15,8 @@ from modules.log import *
from modules.system import *
# list of commands to remove from the default list for DM only
restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "quiz", "q:"]
restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "quiz", "q:", "survey", "s:"]
restrictedResponse = "🤖only available in a Direct Message📵" # "" for none
cmdHistory = [] # list to hold the command history for lheard and history commands
msg_history = [] # list to hold the message history for the messages command
def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM):
global cmdHistory, msg_history
@@ -87,6 +85,8 @@ def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_n
"sms:": lambda: handle_sms(message_from_id, message),
"solar": lambda: drap_xray_conditions() + "\n" + solar_conditions(),
"sun": lambda: handle_sun(message_from_id, deviceID, channel_number),
"survey": lambda: surveyHandler(message, message_from_id, deviceID),
"s:": lambda: surveyHandler(message, message_from_id, deviceID),
"sysinfo": lambda: sysinfo(message, message_from_id, deviceID),
"test": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"testing": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
@@ -909,6 +909,40 @@ def quizHandler(message, nodeID, deviceID):
else:
return "🧠Please provide an answer or command, or send q: ?"
def surveyHandler(message, nodeID, deviceID):
global surveyTracker
location = get_node_location(nodeID, deviceID)
if "survey " in message.lower():
surveySays = message.lower().strip().split("survey ", 1)
elif "s:" in message.lower():
surveySays = message.lower().strip().split("s:", 1)
else:
surveySays = [message.lower().strip()]
survey = surveySays[1] if len(surveySays) > 1 else "example"
if surveySays[0].strip().lower() == "end":
msg = survey_module.end_survey(user_id=nodeID)
return msg
# Update last played or add new tracker entry
found = False
for i in range(len(surveyTracker)):
if surveyTracker[i].get('nodeID') == nodeID:
surveyTracker[i]['last_played'] = time.time()
found = True
break
if not found:
surveyTracker.append({'nodeID': nodeID, 'last_played': time.time()})
# If not in survey session, start one
if nodeID not in survey_module.responses:
msg = survey_module.start_survey(user_id=nodeID, survey_name=survey, location=location)
else:
msg = survey_module.answer(user_id=nodeID, answer=surveySays[1].strip())
return msg
def handle_riverFlow(message, message_from_id, deviceID):
location = get_node_location(message_from_id, deviceID)
@@ -1251,6 +1285,7 @@ def checkPlayingGame(message_from_id, message_string, rxNode, channel_number):
(hangmanTracker, "Hangman", handleHangman) if 'hangmanTracker' in globals() else None,
(hamtestTracker, "HamTest", handleHamtest) if 'hamtestTracker' in globals() else None,
(tictactoeTracker, "TicTacToe", handleTicTacToe) if 'tictactoeTracker' in globals() else None,
(surveyTracker, "Survey", surveyHandler) if 'surveyTracker' in globals() else None,
#quiz does not use a tracker (quizGamePlayer) always active
]
trackers = [tracker for tracker in trackers if tracker is not None]

View File

@@ -28,6 +28,9 @@ wiki_return_limit = 3 # limit the number of sentences returned off the first par
GAMEDELAY = 28800 # 8 hours in seconds for game mode holdoff
cmdHistory = [] # list to hold the last commands
seenNodes = [] # list to hold the last seen nodes
surveyTracker, tictactoeTracker, hamtestTracker, hangmanTracker, golfTracker, mastermindTracker, vpTracker, blackjackTracker, lemonadeTracker, dwPlayerTracker = ([], [], [], [], [], [], [], [], [], [])
cmdHistory = [] # list to hold the command history for lheard and history commands
msg_history = [] # list to hold the message history for the messages command
# Read the config file, if it does not exist, create basic config file
config = configparser.ConfigParser()
@@ -373,7 +376,10 @@ try:
hangman_enabled = config['games'].getboolean('hangman', True)
hamtest_enabled = config['games'].getboolean('hamtest', True)
tictactoe_enabled = config['games'].getboolean('tictactoe', True)
quiz_enabled = config['games'].getboolean('quiz', True)
quiz_enabled = config['games'].getboolean('quiz', False)
survey_enabled = config['games'].getboolean('survey', True)
surveyRecordID = config['games'].getboolean('surveyRecordID', True)
surveyRecordLocation = config['games'].getboolean('surveyRecordLocation', True)
# messaging settings
responseDelay = config['messagingSettings'].getfloat('responseDelay', 0.7) # default 0.7

173
modules/survey.py Normal file
View File

@@ -0,0 +1,173 @@
# Survey Module for meshbot 2025
# Provides a survey function to collect responses and put into a CSV file
import json
import os # For file operations
from collections import Counter
from modules.log import *
allowedSurveys = [] # List of allowed survey names
trap_list_survey = ("survey", "s:")
class SurveyModule:
def __init__(self):
self.base_dir = os.path.dirname(__file__)
self.survey_dir = os.path.join(self.base_dir, '..', 'data', 'surveys') # Directory for survey JSON files
self.response_dir = os.path.join(self.base_dir, '..', 'data', 'surveys') # Directory for survey response CSV files
self.surveys = {}
self.responses = {}
self.load_surveys()
def load_surveys(self):
"""Load all surveys from the surveys directory with _survey.json suffix."""
global allowedSurveys
allowedSurveys.clear()
for filename in os.listdir(self.survey_dir):
if filename.endswith('_survey.json'):
survey_name = filename[:-12] # Remove '_survey.json'
allowedSurveys.append(survey_name)
path = os.path.join(self.survey_dir, filename)
try:
with open(path, encoding='utf-8') as f:
self.surveys[survey_name] = json.load(f)
except FileNotFoundError:
logger.error(f"File not found: {path}")
self.surveys[survey_name] = []
except json.JSONDecodeError:
logger.error(f"Error decoding JSON from file: {path}")
self.surveys[survey_name] = []
def start_survey(self, user_id, survey_name='example', location=None):
"""Begin a new survey session for a user."""
if not survey_name:
survey_name = 'example'
if survey_name not in allowedSurveys:
return f"error: survey '{survey_name}' is not allowed."
self.responses[user_id] = {
'survey_name': survey_name,
'current_question': 0,
'answers': [],
'location': location if surveyRecordLocation and location is not None else 'N/A'
}
msg = f"'{survey_name}'📝survey use 's: <answer>' 'end' to exit."
msg += self.show_question(user_id)
return msg
def show_question(self, user_id):
"""Show the current question for the user, or end the survey."""
survey_name = self.responses[user_id]['survey_name']
current = self.responses[user_id]['current_question']
questions = self.surveys.get(survey_name, [])
if current >= len(questions):
return self.end_survey(user_id)
question = questions[current]
msg = f"{question['question']}\n"
if question.get('type', 'multiple_choice') == 'multiple_choice':
for i, option in enumerate(question['options']):
msg += f"{chr(65+i)}. {option}\n"
elif question['type'] == 'integer':
msg += "(Please enter a number)\n"
elif question['type'] == 'text':
msg += "(Please enter your response)\n"
return msg
def save_responses(self, user_id):
"""Save user responses to a CSV file."""
survey_name = self.responses[user_id]['survey_name']
if survey_name not in self.surveys:
logger.warning(f"Survey '{survey_name}' not loaded. Responses not saved.")
return
filename = os.path.join(self.response_dir, f'{survey_name}_responses.csv')
try:
with open(filename, 'a', encoding='utf-8') as f:
row = list(map(str, self.responses[user_id]['answers']))
if surveyRecordID:
row.insert(0, str(user_id))
if surveyRecordLocation:
location = self.responses[user_id].get('location')
row.insert(1 if surveyRecordID else 0, str(location) if location is not None else "N/A")
f.write(','.join(row) + '\n')
logger.info(f"Responses saved to {filename}")
except Exception as e:
logger.error(f"Error saving responses to {filename}: {e}")
def answer(self, user_id, answer, location=None):
"""Record an answer and return the next question or end message."""
if user_id not in self.responses:
return self.start_survey(user_id, location=location)
question_index = self.responses[user_id]['current_question']
survey_name = self.responses[user_id]['survey_name']
questions = self.surveys.get(survey_name, [])
if question_index < 0 or question_index >= len(questions):
return "No current question to answer."
question = questions[question_index]
qtype = question.get('type', 'multiple_choice')
if qtype == 'multiple_choice':
answer_char = answer.strip().upper()[:1]
if len(answer_char) != 1 or not answer_char.isalpha():
return "Please answer with a letter (A, B, C, ...)."
option_index = ord(answer_char) - 65
if 0 <= option_index < len(question['options']):
self.responses[user_id]['answers'].append(str(option_index))
self.responses[user_id]['current_question'] += 1
return f"Recorded..\n" + self.show_question(user_id)
else:
print(f"Invalid option index {option_index} for question with {len(question['options'])} options. user entered '{answer}'")
return "Invalid answer option. Please try again."
elif qtype == 'integer':
try:
int_answer = int(answer)
self.responses[user_id]['answers'].append(str(int_answer))
self.responses[user_id]['current_question'] += 1
return f"Recorded..\n" + self.show_question(user_id)
except ValueError:
return "Please enter a valid integer."
elif qtype == 'text':
self.responses[user_id]['answers'].append(answer.strip())
self.responses[user_id]['current_question'] += 1
return f"Recorded..\n" + self.show_question(user_id)
else:
return f"error: unknown question type '{qtype}' and cannot record answer '{answer}'"
def end_survey(self, user_id):
"""End the survey for the user and save responses."""
self.save_responses(user_id)
self.responses.pop(user_id, None)
return "✅ Survey complete. Thank you for your responses!"
def quiz_report(self, survey_name='example'):
"""
Generate a quick poll report: counts of each answer per question.
Returns a string summary.
"""
filename = os.path.join(self.response_dir, f'{survey_name}_responses.csv')
questions = self.surveys.get(survey_name, [])
if not questions:
logger.warning(f"No survey found for '{survey_name}'.")
return f"No survey found for '{survey_name}'."
all_answers = []
try:
with open(filename, encoding='utf-8') as f:
for line in f:
parts = line.strip().split(',')
if surveyRecordID:
answers = [int(x) for x in parts[1:] if x.strip().isdigit()]
else:
answers = [int(x) for x in parts if x.strip().isdigit()]
all_answers.append(answers)
except FileNotFoundError:
logger.info(f"No responses recorded yet for '{survey_name}'.")
return "No responses recorded yet."
report = f"📊 Poll Report for '{survey_name}':\n"
for q_idx, question in enumerate(questions):
counts = Counter(ans[q_idx] for ans in all_answers if len(ans) > q_idx)
report += f"\nQ{q_idx+1}: {question['question']}\n"
for opt_idx, option in enumerate(question.get('options', [])):
count = counts.get(opt_idx, 0)
report += f" {chr(65+opt_idx)}. {option}: {count}\n"
return report
# Initialize the survey module
survey_module = SurveyModule()

View File

@@ -66,7 +66,7 @@ def cleanup_game_trackers(current_time):
tracker_names = [
'dwPlayerTracker', 'lemonadeTracker', 'jackTracker',
'vpTracker', 'mindTracker', 'golfTracker',
'hangmanTracker', 'hamtestTracker', 'tictactoeTracker'
'hangmanTracker', 'hamtestTracker', 'tictactoeTracker, surveyTracker'
]
for tracker_name in tracker_names:
@@ -270,6 +270,12 @@ if quiz_enabled:
help_message = help_message + ", quiz"
games_enabled = True
if survey_enabled:
from modules.survey import * # from the spudgunman/meshing-around repo
trap_list = trap_list + trap_list_survey # items survey, s:
help_message = help_message + ", survey"
games_enabled = True
# Games Configuration
if games_enabled is True:
help_message = help_message + ", games"