From 47dd75bfb3943c055a41c54a74fa4d5f36bd4fc3 Mon Sep 17 00:00:00 2001 From: SpudGunMan Date: Fri, 31 Oct 2025 16:02:14 -0700 Subject: [PATCH] autoBlock, enhance ban list and such https://github.com/SpudGunMan/meshing-around/issues/252 # Enable or disable automatic banning of nodes autoBanEnabled = False --- config.template | 6 ++++ mesh_bot.py | 36 +++++++++++++++++---- modules/settings.py | 4 +++ modules/system.py | 79 ++++++++++++++++++++++++++++++++++++++++++--- pong_bot.py | 27 +++++++++++++--- 5 files changed, 136 insertions(+), 16 deletions(-) diff --git a/config.template b/config.template index 1e8bc17..dcfb711 100644 --- a/config.template +++ b/config.template @@ -481,3 +481,9 @@ DEBUGpacket = False # metaPacket detailed logging, the filter negates the port ID debugMetadata = False metadataFilter = TELEMETRY_APP,POSITION_APP +# Enable or disable automatic banning of nodes +autoBanEnabled = False +# Number of offenses before auto-ban +autoBanThreshold = 5 +# Timeframe for offenses (in seconds) +autoBanTimeframe = 3600 \ No newline at end of file diff --git a/mesh_bot.py b/mesh_bot.py index 51e8069..d6a5b12 100755 --- a/mesh_bot.py +++ b/mesh_bot.py @@ -249,7 +249,11 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann global multiPing myNodeNum = globals().get(f'myNodeNum{deviceID}', 777) if "?" in message and isDM: - return message.split("?")[0].title() + " command returns SNR and RSSI, or hopcount from your message. Try adding e.g. @place or #tag" + pingHelp = "🤖Ping Command Help:\n" \ + "🏓 Send 'ping' or 'ack' or 'test' to get a response.\n" \ + "🏓 Send 'ping ' to get multiple pings in DM" + "🏓 ping @USERID to send a Joke from the bot" + return pingHelp msg = "" type = '' @@ -330,8 +334,11 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann # no autoping in channels pingCount = 1 - if pingCount > 51: + if pingCount > 51 and pingCount <= 101: pingCount = 50 + if pingCount > 800: + ban_hammer(message_from_id, deviceID, reason="Excessive auto-ping request") + return "🚫⛔️auto-ping request denied." except ValueError: pingCount = -1 @@ -358,7 +365,8 @@ def handle_emergency(message_from_id, deviceID, message): # if user in bbs_ban_list return if str(message_from_id) in my_settings.bbs_ban_list: # silent discard - logger.warning(f"System: {message_from_id} on spam list, no emergency responder alert sent") + hammer_value = ban_hammer(message_from_id, deviceID, reason="Emergency Alert from banned node") + logger.warning(f"System: {message_from_id} on spam list, no emergency responder alert sent. Ban hammer value: {hammer_value}") return '' # trgger alert to emergency_responder_alert_channel if message_from_id != 0: @@ -1649,6 +1657,10 @@ def handle_boot(mesh=True): if my_settings.useDMForResponse: logger.debug("System: Respond by DM only") + if my_settings.autoBanEnabled: + logger.debug(f"System: Auto-Ban Enabled for {my_settings.autoBanThreshold} messages in {my_settings.autoBanTimeframe} seconds") + load_bbsBanList() + if my_settings.log_messages_to_file: logger.debug("System: Logging Messages to disk") if my_settings.syslog_to_file: @@ -1781,9 +1793,14 @@ def onReceive(packet, interface): message_from_id = packet['from'] # if message_from_id is not in the seenNodes list add it - if not any(node['nodeID'] == message_from_id for node in seenNodes): - seenNodes.append({'nodeID': message_from_id, 'rxInterface': rxNode, 'channel': channel_number, 'welcome': False, 'lastSeen': time.time()}) - + if not any(node.get('nodeID') == message_from_id for node in seenNodes): + seenNodes.append({'nodeID': message_from_id, 'rxInterface': rxNode, 'channel': channel_number, 'welcome': False, 'first_seen': time.time(), 'lastSeen': time.time()}) + else: + # update lastSeen time + for node in seenNodes: + if node.get('nodeID') == message_from_id: + node['lastSeen'] = time.time() + break # BBS DM MAIL CHECKER if bbs_enabled and 'decoded' in packet: msg = bbs_check_dm(message_from_id) @@ -1792,7 +1809,12 @@ def onReceive(packet, interface): message = "Mail: " + msg[1] + " From: " + get_name_from_number(msg[2], 'long', rxNode) bbs_delete_dm(msg[0], msg[1]) send_message(message, channel_number, message_from_id, rxNode) - + + # CHECK with ban_hammer() if the node is banned + if str(message_from_id) in my_settings.bbs_ban_list or str(message_from_id) in my_settings.autoBanlist: + logger.warning(f"System: Banned Node {message_from_id} tried to send a message. Ignored. Try adding to node firmware-blocklist") + return + # handle TEXT_MESSAGE_APP try: if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': diff --git a/modules/settings.py b/modules/settings.py index d0da95c..c8d6d42 100644 --- a/modules/settings.py +++ b/modules/settings.py @@ -35,6 +35,7 @@ voxMsgQueue = [] # queue for VOX detected messages tts_read_queue = [] # queue for TTS messages wsjtxMsgQueue = [] # queue for WSJT-X detected messages js8callMsgQueue = [] # queue for JS8Call detected messages +autoBanlist = [] # list of nodes to autoban for repeated offenses # Game trackers surveyTracker = [] # Survey game tracker tictactoeTracker = [] # TicTacToe game tracker @@ -494,6 +495,9 @@ try: noisyNodeLogging = config['messagingSettings'].getboolean('noisyNodeLogging', False) # default False logMetaStats = config['messagingSettings'].getboolean('logMetaStats', True) # default True noisyTelemetryLimit = config['messagingSettings'].getint('noisyTelemetryLimit', 5) # default 5 packets + autoBanEnabled = config['messagingSettings'].getboolean('autoBanEnabled', False) # default False + autoBanThreshold = config['messagingSettings'].getint('autoBanThreshold', 5) # default 5 offenses + autoBanTimeframe = config['messagingSettings'].getint('autoBanTimeframe', 3600) # default 1 hour in seconds except Exception as e: print(f"System: Error reading config file: {e}") print("System: Check the config.ini against config.template file for missing sections or values.") diff --git a/modules/system.py b/modules/system.py index 9392891..87513f9 100644 --- a/modules/system.py +++ b/modules/system.py @@ -953,7 +953,6 @@ def messageTrap(msg): def stringSafeCheck(s): # Check if a string is safe to use, no control characters or non-printable characters - soFarSoGood = True if not all(c.isprintable() or c.isspace() for c in s): return False if any(ord(c) < 32 and c not in '\n\r\t' for c in s): @@ -962,10 +961,79 @@ def stringSafeCheck(s): return False if len(s) > 1000: return False - injection_chars = [';', '|', '../'] - if any(char in s for char in injection_chars): + # Check for single-character injections + single_injection_chars = [';', '|', '}', '>', ')'] + if any(c in s for c in single_injection_chars): return False - return soFarSoGood + # Check for multi-character patterns + multi_injection_patterns = ['../', '||'] + if any(pattern in s for pattern in multi_injection_patterns): + return False + return True + +def ban_hammer(node_id, rxInterface=None, channel=None, reason=""): + """ + Auto-ban nodes that exceed the message threshold within the timeframe. + Returns True if the node is (or becomes) banned, False otherwise. + """ + global autoBanlist, seenNodes, bbs_ban_list + + current_time = time.time() + node_id_str = str(node_id) + + # Check if the node is already banned + if node_id_str in bbs_ban_list or node_id_str in autoBanlist: + return True # Node is already banned + + # if no reason provided, dont ban just run that last check + if reason == "": + return False + + # Find or create the seenNodes entry (patched for missing 'node_id') + node_entry = next((entry for entry in seenNodes if entry.get('node_id') == node_id_str), None) + if node_entry: + # Update interface and channel if provided + if rxInterface is not None: + node_entry['rxInterface'] = rxInterface + if channel is not None: + node_entry['channel'] = channel + # Check if the timeframe has expired + if (current_time - node_entry['lastSeen']) > autoBanTimeframe: + node_entry['auto_ban_count'] = 1 + node_entry['lastSeen'] = current_time + else: + node_entry['auto_ban_count'] += 1 + node_entry['lastSeen'] = current_time + else: + # node not found, create a new entry + entry = { + 'node_id': node_id_str, + 'first_seen': current_time, + 'lastSeen': current_time, + 'auto_ban_count': 3, # start at 3 to trigger ban faster + 'rxInterface': rxInterface, + 'channel': channel, + 'welcome': False + } + seenNodes.append(entry) + node_entry = entry + + # Check if the node has exceeded the ban threshold + if node_entry['auto_ban_count'] < autoBanThreshold: + logger.debug(f"System: Node {node_id_str} auto-ban count: {node_entry['auto_ban_count']}") + return False # No ban applied + + # If the node has exceeded the ban threshold within the time window + autoBanlist.append(node_id_str) + logger.info(f"System: Node {node_id_str} exceeded auto-ban threshold with {node_entry['auto_ban_count']} messages") + if autoBanEnabled: + logger.warning(f"System: Auto-banned node {node_id_str} Reason: {reason}") + if node_id_str not in bbs_ban_list: + bbs_ban_list.append(node_id_str) + save_bbsBanList() + return True # Node is now banned + + return False # No ban applied def save_bbsBanList(): # save the bbs_ban_list to file @@ -983,7 +1051,7 @@ def load_bbsBanList(): try: with open('data/bbs_ban_list.txt', 'r') as f: loaded_list = [line.strip() for line in f if line.strip()] - logger.debug("System: BBS ban list loaded from file") + logger.debug(f"System: BBS ban list now has {len(loaded_list)} entries loaded from file") except FileNotFoundError: config_val = config['bbs'].get('bbs_ban_list', '') if config_val: @@ -1013,6 +1081,7 @@ def isNodeBanned(nodeID): return False def handle_bbsban(message, message_from_id, isDM): + global bbs_ban_list msg = "" if not isDM: return "🤖only available in a Direct Message📵" diff --git a/pong_bot.py b/pong_bot.py index 945fd3e..ee81d18 100755 --- a/pong_bot.py +++ b/pong_bot.py @@ -65,7 +65,11 @@ def handle_cmd(message, message_from_id, deviceID): def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number): global multiPing if "?" in message and isDM: - return message.split("?")[0].title() + " command returns SNR and RSSI, or hopcount from your message. Try adding e.g. @place or #tag" + pingHelp = "🤖Ping Command Help:\n" \ + "🏓 Send 'ping' or 'ack' or 'test' to get a response.\n" \ + "🏓 Send 'ping ' to get multiple pings in DM" + "🏓 ping @USERID to send a Joke from the bot" + return pingHelp msg = "" type = '' @@ -303,10 +307,21 @@ def onReceive(packet, interface): # set the message_from_id message_from_id = packet['from'] - # check if the packet has a channel flag use it - if packet.get('channel'): - channel_number = packet.get('channel', 0) + # if message_from_id is not in the seenNodes list add it + if not any(node.get('nodeID') == message_from_id for node in seenNodes): + seenNodes.append({'nodeID': message_from_id, 'rxInterface': rxNode, 'channel': channel_number, 'welcome': False, 'first_seen': time.time(), 'lastSeen': time.time()}) + else: + # update lastSeen time + for node in seenNodes: + if node.get('nodeID') == message_from_id: + node['lastSeen'] = time.time() + break + # CHECK with ban_hammer() if the node is banned + if str(message_from_id) in my_settings.bbs_ban_list or str(message_from_id) in my_settings.autoBanlist: + logger.warning(f"System: Banned Node {message_from_id} tried to send a message. Ignored. Try adding to node firmware-blocklist") + return + # handle TEXT_MESSAGE_APP try: if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': @@ -574,6 +589,10 @@ def handle_boot(mesh=True): if my_settings.useDMForResponse: logger.debug("System: Respond by DM only") + if my_settings.autoBanEnabled: + logger.debug(f"System: Auto-Ban Enabled for {my_settings.autoBanThreshold} messages in {my_settings.autoBanTimeframe} seconds") + load_bbsBanList() + if my_settings.log_messages_to_file: logger.debug("System: Logging Messages to disk") if my_settings.syslog_to_file: