Files
meshing-around/modules/smtp.py
T
SpudGunMan ae1a3040b5 patches
dont need no stinking patches. thanks again.
2025-10-06 12:54:41 -07:00

273 lines
10 KiB
Python

# SMTP module for the meshing-around bot
# 2024 Idea and code bits from https://github.com/tremmert81
# https://avtech.com/articles/138/list-of-email-to-sms-addresses/
# 2024 Kelly Keeton K7MHI
from modules.log import *
import pickle
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# System variables
trap_list_smtp = ("email:", "setemail", "sms:", "setsms", "clearsms")
smtpThrottle = {}
SMTP_TIMEOUT = 10
if enableImap:
# Import IMAP library
import imaplib
import email
# Send email
def send_email(to_email, message, nodeID=0):
global smtpThrottle
# Clean up email address
to_email = to_email.strip()
# Basic email validation
if "@" not in to_email or "." not in to_email:
logger.warning(f"System: Invalid email address format: {to_email}")
return False
# throttle email to prevent abuse
if to_email in smtpThrottle:
if smtpThrottle[to_email] > time.time() - 120:
logger.warning("System: Email throttled for " + to_email[:-6])
return "⛔️Email throttled, try again later"
smtpThrottle[to_email] = time.time()
# check if email is in the ban list
if nodeID in bbs_ban_list:
logger.warning("System: Email blocked for " + str(nodeID))
return "⛔️Email throttled, try again later"
# Send email
try:
# Create message
msg = MIMEMultipart()
msg['From'] = FROM_EMAIL
msg['To'] = to_email
msg['Subject'] = EMAIL_SUBJECT
msg.attach(MIMEText(message, 'plain'))
# Connect to SMTP server
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=SMTP_TIMEOUT)
try:
# login /auth
if SMTP_PORT == 587:
server.starttls()
if SMTP_AUTH:
server.login(SMTP_USERNAME, SMTP_PASSWORD)
except Exception as e:
logger.warning(f"System: Failed to login to SMTP server: {str(e)}")
return
# Send email; this command will hold the program until the email is sent
server.send_message(msg)
server.quit()
logger.info("System: Email sent to: " + to_email[:-6])
return True
except Exception as e:
logger.warning(f"System: Failed to send email: {str(e)}")
return False
def check_email(nodeID, sysop=False):
if not enableImap:
return
try:
# Connect to IMAP server
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT, timeout=SMTP_TIMEOUT)
mail.login(IMAP_USERNAME, IMAP_PASSWORD)
mail.select(IMAP_FOLDER)
# Search for new emails
status, data = mail.search(None, 'UNSEEN')
if status == 'OK':
for num in data[0].split():
status, data = mail.fetch(num, '(RFC822)')
if status == 'OK':
email_message = email.message_from_bytes(data[0][1])
email_from = email_message['from']
email_subject = email_message['subject']
email_body = ""
if not sysop:
# Check if email is whitelisted by particpant in the mesh
for address in sms_db[nodeID]:
if address in email_from:
email_body = email_message.get_payload()
logger.info("System: Email received from: " + email_from[:-6] + " for " + str(nodeID))
return email_body.strip()
else:
# Check if email is from sysop
for address in sysopEmails:
if address in email_from:
email_body = email_message.get_payload()
logger.info("System: SysOp Email received from: " + email_from[:-6] + " for sysop")
return email_body.strip()
except Exception as e:
logger.warning("System: Failed to check email: " + str(e))
return False
# initalize email db
email_db = {}
try:
with open('data/email_db.pickle', 'rb') as f:
email_db = pickle.load(f)
except:
logger.warning("System: Email db not found, creating a new one")
with open('data/email_db.pickle', 'wb') as f:
pickle.dump(email_db, f)
def store_email(nodeID, email):
global email_db
# if not in db, add it
logger.debug("System: Setting E-Mail for " + str(nodeID))
email_db[nodeID] = email
# save to a pickle for persistence, this is a simple db, be mindful of risk
with open('data/email_db.pickle', 'wb') as f:
pickle.dump(email_db, f)
f.close()
return True
# initalize SMS db
sms_db = [{'nodeID': 0, 'sms':[]}]
try:
with open('data/sms_db.pickle', 'rb') as f:
sms_db = pickle.load(f)
except:
logger.warning("System: SMS db not found, creating a new one")
with open('data/sms_db.pickle', 'wb') as f:
pickle.dump(sms_db, f)
def store_sms(nodeID, sms):
global sms_db
try:
logger.debug("System: Setting SMS for " + str(nodeID))
# if the nodeID has over 5 sms addresses warn and return
for item in sms_db:
if item['nodeID'] == nodeID:
if len(item['sms']) >= 5:
logger.warning("System: 📵SMS limit reached for " + str(nodeID))
return False
# if not in db, add it
if nodeID not in sms_db:
sms_db.append({'nodeID': nodeID, 'sms': sms})
else:
# if in db, update it
for item in sms_db:
if item['nodeID'] == nodeID:
item['sms'].append(sms)
# save to a pickle for persistence, this is a simple db, be mindful of risk
with open('data/sms_db.pickle', 'wb') as f:
pickle.dump(sms_db, f)
f.close()
return True
except Exception as e:
logger.warning("System: Failed to store SMS: " + str(e))
return False
def handle_sms(nodeID, message):
global sms_db
# if clearsms, remove all sms for node
if message.lower().startswith("clearsms"):
if any(item['nodeID'] == nodeID for item in sms_db):
# remove record from db for nodeID
sms_db = [item for item in sms_db if item['nodeID'] != nodeID]
# update the pickle
with open('data/sms_db.pickle', 'wb') as f:
pickle.dump(sms_db, f)
f.close()
return "📲 address cleared"
return "📲No address to clear"
# send SMS to SMS in db. if none ask for one
if message.lower().startswith("setsms"):
message = message.split(" ", 1)
if len(message[1]) < 5:
return "?📲setsms: example@phone.co"
if "@" not in message[1] and "." not in message[1]:
return "📲Please provide a valid email address"
if store_sms(nodeID, message[1]):
return "📲SMS address set 📪"
else:
return "⛔️Failed to set address"
if message.lower().startswith("sms:"):
message = message.split(" ", 1)
if any(item['nodeID'] == nodeID for item in sms_db):
count = 0
# for all dict items maching nodeID in sms_db send sms
for item in sms_db:
if item['nodeID'] == nodeID:
smsEmail = item['sms']
logger.info("System: Sending SMS for " + str(nodeID) + " to " + smsEmail[:-6])
if send_email(smsEmail, message[1], nodeID):
count += 1
else:
return "⛔️Failed to send SMS"
return "📲SMS sent " + str(count) + " addresses 📤"
else:
return "📲No address set, use 📲setsms"
return "Error: ⛔️ not understood. use:setsms example@phone.co"
def handle_email(nodeID, message):
global email_db
try:
# send email to email in db. if none ask for one
if message.lower().startswith("setemail"):
message = message.split(" ", 1)
if len(message) < 2:
return "📧Please provide an email address"
email_addr = message[1].strip()
if "@" not in email_addr or "." not in email_addr:
return "📧Please provide a valid email address"
if store_email(nodeID, email_addr):
return "📧Email address set 📪"
return "Error: ⛔️ Failed to set email address"
if message.lower().startswith("email:"):
parts = message.split(" ", 1)
if len(parts) < 2:
return "Error: ⛔️ format should be: email: message or, email: address@example.com #message"
content = parts[1].strip()
# Check if this is a direct email with address
if "@" in content and "#" in content:
# Split into email and message
addr_msg = content.split("#", 1)
if len(addr_msg) != 2:
return "Error: ⛔️ Message format should be: email: address@example.com #message"
to_email = addr_msg[0].strip()
message_body = addr_msg[1].strip()
logger.info(f"System: Sending email for {nodeID} to {to_email}")
if send_email(to_email, message_body, nodeID):
return "📧Email-sent 📤"
return "⛔️Failed to send email"
# Using stored email address
elif nodeID in email_db:
logger.info(f"System: Sending email for {nodeID} to stored address")
if send_email(email_db[nodeID], content, nodeID):
return "📧Email-sent 📤"
return "⛔️Failed to send email"
return "Error: ⛔️ no email on file. use: setemail"
except Exception as e:
logger.error(f"System: Email handling error: {str(e)}")
return "⛔️Failed to process email command"