Files
meshing-around/modules/smtp.py
SpudGunMan fa76a76203 BIG OLD PATCH 🍠
pz days ... haha. I hope this works.
fancy potato
2025-10-24 19:54:46 -07:00

278 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SMTP module for the meshing-around bot
# 2024 Idea and code bits from https://github.com/tremmert81
# https://avtech.com/articles/138/list-of-email-to-sms-addresses/
# 2024 Kelly Keeton K7MHI
from modules.log import logger
from modules.settings import (
SMTP_SERVER, SMTP_PORT, SMTP_AUTH, SMTP_USERNAME, SMTP_PASSWORD,
FROM_EMAIL, EMAIL_SUBJECT, enableImap, IMAP_SERVER, IMAP_PORT,
IMAP_USERNAME, IMAP_PASSWORD, IMAP_FOLDER, sysopEmails, bbs_ban_list
)
import pickle
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# System variables
trap_list_smtp = ("email:", "setemail", "sms:", "setsms", "clearsms")
smtpThrottle = {}
SMTP_TIMEOUT = 10
if enableImap:
# Import IMAP library
import imaplib
import email
# Send email
def send_email(to_email, message, nodeID=0):
global smtpThrottle
# Clean up email address
to_email = to_email.strip()
# Basic email validation
if "@" not in to_email or "." not in to_email:
logger.warning(f"System: Invalid email address format: {to_email}")
return False
# throttle email to prevent abuse
if to_email in smtpThrottle:
if smtpThrottle[to_email] > time.time() - 120:
logger.warning("System: Email throttled for " + to_email[:-6])
return "Email throttled, try again later"
smtpThrottle[to_email] = time.time()
# check if email is in the ban list
if nodeID in bbs_ban_list:
logger.warning("System: Email blocked for " + str(nodeID))
return "Email throttled, try again later"
# Send email
try:
# Create message
msg = MIMEMultipart()
msg['From'] = FROM_EMAIL
msg['To'] = to_email
msg['Subject'] = EMAIL_SUBJECT
msg.attach(MIMEText(message, 'plain'))
# Connect to SMTP server
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=SMTP_TIMEOUT)
try:
# login /auth
if SMTP_PORT == 587:
server.starttls()
if SMTP_AUTH:
server.login(SMTP_USERNAME, SMTP_PASSWORD)
except Exception as e:
logger.warning(f"System: Failed to login to SMTP server: {str(e)}")
return
# Send email; this command will hold the program until the email is sent
server.send_message(msg)
server.quit()
logger.info("System: Email sent to: " + to_email[:-6])
return True
except Exception as e:
logger.warning(f"System: Failed to send email: {str(e)}")
return False
def check_email(nodeID, sysop=False):
if not enableImap:
return
try:
# Connect to IMAP server
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT, timeout=SMTP_TIMEOUT)
mail.login(IMAP_USERNAME, IMAP_PASSWORD)
mail.select(IMAP_FOLDER)
# Search for new emails
status, data = mail.search(None, 'UNSEEN')
if status == 'OK':
for num in data[0].split():
status, data = mail.fetch(num, '(RFC822)')
if status == 'OK':
email_message = email.message_from_bytes(data[0][1])
email_from = email_message['from']
email_subject = email_message['subject']
email_body = ""
if not sysop:
# Check if email is whitelisted by particpant in the mesh
for address in sms_db[nodeID]:
if address in email_from:
email_body = email_message.get_payload()
logger.info("System: Email received from: " + email_from[:-6] + " for " + str(nodeID))
return email_body.strip()
else:
# Check if email is from sysop
for address in sysopEmails:
if address in email_from:
email_body = email_message.get_payload()
logger.info("System: SysOp Email received from: " + email_from[:-6] + " for sysop")
return email_body.strip()
except Exception as e:
logger.warning("System: Failed to check email: " + str(e))
return False
# initalize email db
email_db = {}
try:
with open('data/email_db.pickle', 'rb') as f:
email_db = pickle.load(f)
except:
logger.warning("System: Email db not found, creating a new one")
with open('data/email_db.pickle', 'wb') as f:
pickle.dump(email_db, f)
def store_email(nodeID, email):
global email_db
# if not in db, add it
logger.debug("System: Setting E-Mail for " + str(nodeID))
email_db[nodeID] = email
# save to a pickle for persistence, this is a simple db, be mindful of risk
with open('data/email_db.pickle', 'wb') as f:
pickle.dump(email_db, f)
f.close()
return True
# initalize SMS db
sms_db = [{'nodeID': 0, 'sms':[]}]
try:
with open('data/sms_db.pickle', 'rb') as f:
sms_db = pickle.load(f)
except:
logger.warning("System: SMS db not found, creating a new one")
with open('data/sms_db.pickle', 'wb') as f:
pickle.dump(sms_db, f)
def store_sms(nodeID, sms):
global sms_db
try:
logger.debug("System: Setting SMS for " + str(nodeID))
# if the nodeID has over 5 sms addresses warn and return
for item in sms_db:
if item['nodeID'] == nodeID:
if len(item['sms']) >= 5:
logger.warning("System: 📵SMS limit reached for " + str(nodeID))
return False
# if not in db, add it
if nodeID not in sms_db:
sms_db.append({'nodeID': nodeID, 'sms': sms})
else:
# if in db, update it
for item in sms_db:
if item['nodeID'] == nodeID:
item['sms'].append(sms)
# save to a pickle for persistence, this is a simple db, be mindful of risk
with open('data/sms_db.pickle', 'wb') as f:
pickle.dump(sms_db, f)
f.close()
return True
except Exception as e:
logger.warning("System: Failed to store SMS: " + str(e))
return False
def handle_sms(nodeID, message):
global sms_db
# if clearsms, remove all sms for node
if message.lower().startswith("clearsms"):
if any(item['nodeID'] == nodeID for item in sms_db):
# remove record from db for nodeID
sms_db = [item for item in sms_db if item['nodeID'] != nodeID]
# update the pickle
with open('data/sms_db.pickle', 'wb') as f:
pickle.dump(sms_db, f)
f.close()
return "📲 address cleared"
return "📲No address to clear"
# send SMS to SMS in db. if none ask for one
if message.lower().startswith("setsms"):
message = message.split(" ", 1)
if len(message[1]) < 5:
return "?📲setsms: example@phone.co"
if "@" not in message[1] and "." not in message[1]:
return "📲Please provide a valid email address"
if store_sms(nodeID, message[1]):
return "📲SMS address set 📪"
else:
return "Failed to set address"
if message.lower().startswith("sms:"):
message = message.split(" ", 1)
if any(item['nodeID'] == nodeID for item in sms_db):
count = 0
# for all dict items maching nodeID in sms_db send sms
for item in sms_db:
if item['nodeID'] == nodeID:
smsEmail = item['sms']
logger.info("System: Sending SMS for " + str(nodeID) + " to " + smsEmail[:-6])
if send_email(smsEmail, message[1], nodeID):
count += 1
else:
return "Failed to send SMS"
return "📲SMS sent " + str(count) + " addresses 📤"
else:
return "📲No address set, use 📲setsms"
return "Error: ⛔️ not understood. use:setsms example@phone.co"
def handle_email(nodeID, message):
global email_db
try:
# send email to email in db. if none ask for one
if message.lower().startswith("setemail"):
message = message.split(" ", 1)
if len(message) < 2:
return "📧Please provide an email address"
email_addr = message[1].strip()
if "@" not in email_addr or "." not in email_addr:
return "📧Please provide a valid email address"
if store_email(nodeID, email_addr):
return "📧Email address set 📪"
return "Error: ⛔️ Failed to set email address"
if message.lower().startswith("email:"):
parts = message.split(" ", 1)
if len(parts) < 2:
return "Error: ⛔️ format should be: email: message or, email: address@example.com #message"
content = parts[1].strip()
# Check if this is a direct email with address
if "@" in content and "#" in content:
# Split into email and message
addr_msg = content.split("#", 1)
if len(addr_msg) != 2:
return "Error: ⛔️ Message format should be: email: address@example.com #message"
to_email = addr_msg[0].strip()
message_body = addr_msg[1].strip()
logger.info(f"System: Sending email for {nodeID} to {to_email}")
if send_email(to_email, message_body, nodeID):
return "📧Email-sent 📤"
return "Failed to send email"
# Using stored email address
elif nodeID in email_db:
logger.info(f"System: Sending email for {nodeID} to stored address")
if send_email(email_db[nodeID], content, nodeID):
return "📧Email-sent 📤"
return "Failed to send email"
return "Error: ⛔️ no email on file. use: setemail"
except Exception as e:
logger.error(f"System: Email handling error: {str(e)}")
return "Failed to process email command"