Merge remote-tracking branch 'origin/master'

This commit is contained in:
Pablo Revilla
2025-10-03 11:59:35 -07:00
6 changed files with 187 additions and 30 deletions

2
.gitignore vendored
View File

@@ -3,6 +3,8 @@ __pycache__/*
meshview/__pycache__/*
meshtastic/protobuf/*
packets.db
meshview-db.pid
meshview-web.pid
/table_details.py
config.ini
screenshots/*

View File

@@ -209,6 +209,17 @@ hour = 2
minute = 00
# Run VACUUM after cleanup
vacuum = False
# -------------------------
# Logging Configuration
# -------------------------
[logging]
# Enable or disable HTTP access logs from the web server
# When disabled, request logs like "GET /api/chat" will not appear
# Application logs (errors, startup messages, etc.) are unaffected
# Set to True to enable, False to disable (default: False)
access_log = False
```
---
@@ -401,5 +412,3 @@ Add schedule to the bottom of the file (modify /path/to/file/ to the correct pat
```
Check the log file to see it the script run at the specific time.

View File

@@ -1,13 +1,23 @@
import base64
import asyncio
import random
import time
import aiomqtt
import logging
from google.protobuf.message import DecodeError
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
def decrypt(packet):
if packet.HasField("decoded"):
@@ -27,6 +37,8 @@ def decrypt(packet):
async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd):
identifier = str(random.getrandbits(16))
msg_count = 0
start_time = None
while True:
try:
async with aiomqtt.Client(
@@ -36,10 +48,16 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa
password=mqtt_passwd,
identifier=identifier,
) as client:
logger.info(f"Connected to MQTT broker at {mqtt_server}:{mqtt_port}")
for topic in topics:
print(f"Subscribing to: {topic}")
logger.info(f"Subscribing to: {topic}")
await client.subscribe(topic)
# Reset start time when connected
if start_time is None:
start_time = time.time()
async for msg in client.messages:
try:
envelope = ServiceEnvelope.FromString(msg.payload)
@@ -52,11 +70,19 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa
continue
# Skip packets from specific node
# FIXME: make this configurable as a list of node IDs to skip
if getattr(envelope.packet, "from", None) == 2144342101:
continue
msg_count += 1
# FIXME: make this interval configurable or time based
if msg_count % 10000 == 0: # Log notice every 10000 messages (approx every hour at 3/sec)
elapsed_time = time.time() - start_time
msg_rate = msg_count / elapsed_time if elapsed_time > 0 else 0
logger.info(f"Processed {msg_count} messages so far... ({msg_rate:.2f} msg/sec)")
yield msg.topic.value, envelope
except aiomqtt.MqttError as e:
print(f"MQTT error: {e}, reconnecting in 1s...")
logger.error(f"MQTT error: {e}, reconnecting in 1s...")
await asyncio.sleep(1)

View File

@@ -2,6 +2,7 @@ import asyncio
import datetime
from datetime import timedelta
import json
import logging
import os
import ssl
from collections import Counter, defaultdict
@@ -23,6 +24,14 @@ import re
import traceback
import pathlib
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
SEQ_REGEX = re.compile(r"seq \d+")
SOFTWARE_RELEASE= "2.0.7 ~ 09-17-25"
CONFIG = config.CONFIG
@@ -439,7 +448,7 @@ async def firehose_updates(request):
try:
last_time = datetime.datetime.fromisoformat(last_time_str)
except Exception as e:
print(f"Failed to parse last_time '{last_time_str}': {e}")
logger.error(f"Failed to parse last_time '{last_time_str}': {e}")
last_time = None
# Query packets after last_time (microsecond precision)
@@ -462,7 +471,7 @@ async def firehose_updates(request):
return web.json_response(response)
except Exception as e:
print("Error in /firehose/updates:", e)
logger.error(f"Error in /firehose/updates: {e}")
return web.json_response({"error": "Failed to fetch updates"}, status=500)
@@ -1112,7 +1121,7 @@ async def net(request):
raise # Let aiohttp handle HTTP exceptions properly
except Exception as e:
print("Error processing net request")
logger.error(f"Error processing net request: {e}")
template = env.get_template("error.html")
rendered = template.render(
error_message="An internal server error occurred.",
@@ -1219,7 +1228,7 @@ async def top(request):
content_type="text/html",
)
except Exception as e:
print("Error in /top:", e)
logger.error(f"Error in /top: {e}")
template = env.get_template("error.html")
rendered = template.render(
error_message="An error occurred in /top",
@@ -1241,7 +1250,7 @@ async def chat(request):
content_type="text/html",
)
except Exception as e:
print("Error in /chat:", e)
logger.error(f"Error in /chat: {e}")
template = env.get_template("error.html")
rendered = template.render(
error_message="An error occurred while processing your request.",
@@ -1301,7 +1310,7 @@ async def nodegraph(request):
edges_map[edge_pair]["weight"] += 1
edges_map[edge_pair]["type"] = "neighbor" # Overrides an existing traceroute pairing with neighbor
except Exception as e:
print(f"Error decoding NeighborInfo packet: {e}")
logger.error(f"Error decoding NeighborInfo packet: {e}")
# Convert edges_map to a list of dicts with colors
max_weight = max(i['weight'] for i in edges_map.values()) if edges_map else 1
@@ -1390,7 +1399,7 @@ async def api_chat(request):
try:
since = datetime.datetime.fromisoformat(since_str)
except Exception as e:
print(f"Failed to parse since '{since_str}': {e}")
logger.error(f"Failed to parse since '{since_str}': {e}")
# Fetch packets from store
packets = await store.get_packets(
@@ -1456,7 +1465,7 @@ async def api_chat(request):
})
except Exception as e:
print("Error in /api/chat:", e)
logger.error(f"Error in /api/chat: {e}")
return web.json_response(
{"error": "Failed to fetch chat data", "details": str(e)},
status=500
@@ -1506,7 +1515,7 @@ async def api_nodes(request):
return web.json_response({"nodes": nodes_data})
except Exception as e:
print("Error in /api/nodes:", e)
logger.error(f"Error in /api/nodes: {e}")
return web.json_response({"error": "Failed to fetch nodes"}, status=500)
@@ -1523,7 +1532,7 @@ async def api_packets(request):
try:
since_time = datetime.datetime.fromisoformat(since_str)
except Exception as e:
print(f"Failed to parse 'since' timestamp '{since_str}': {e}")
logger.error(f"Failed to parse 'since' timestamp '{since_str}': {e}")
# Fetch last N packets
packets = await store.get_packets(
@@ -1545,7 +1554,7 @@ async def api_packets(request):
return web.json_response({"packets": packets_json})
except Exception as e:
print("Error in /api/packets:", str(e))
logger.error(f"Error in /api/packets: {e}")
return web.json_response(
{"error": "Failed to fetch packets"},
status=500
@@ -1638,7 +1647,7 @@ async def api_edges(request):
try:
route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
except Exception as e:
print(f"Error decoding Traceroute {tr.id}: {e}")
logger.error(f"Error decoding Traceroute {tr.id}: {e}")
continue
path = [tr.packet.from_node_id] + list(route.route)
@@ -1656,7 +1665,7 @@ async def api_edges(request):
for node in neighbor_info.neighbors:
edges.setdefault((node.node_id, packet.from_node_id), "neighbor")
except Exception as e:
print(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}")
logger.error(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}")
return web.json_response({
"edges": [
@@ -1687,15 +1696,27 @@ async def serve_page(request):
async def run_server():
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(app)
# Check if access logging should be disabled
enable_access_log = CONFIG.get("logging", {}).get("access_log", "False").lower() == "true"
access_log_handler = None if not enable_access_log else logging.getLogger("aiohttp.access")
runner = web.AppRunner(app, access_log=access_log_handler)
await runner.setup()
if CONFIG["server"]["tls_cert"]:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(CONFIG["server"]["tls_cert"])
logger.info(f"TLS enabled with certificate: {CONFIG['server']['tls_cert']}")
else:
ssl_context = None
logger.info("TLS disabled")
if host := CONFIG["server"]["bind"]:
site = web.TCPSite(runner, host, CONFIG["server"]["port"], ssl_context=ssl_context)
port = CONFIG["server"]["port"]
protocol = "https" if ssl_context else "http"
site = web.TCPSite(runner, host, port, ssl_context=ssl_context)
await site.start()
# Display localhost instead of wildcard addresses for usability
display_host = "localhost" if host in ("0.0.0.0", "*", "::") else host
logger.info(f"Web server started at {protocol}://{display_host}:{port}")
while True:
await asyncio.sleep(3600) # sleep forever

108
mvrun.py
View File

@@ -1,9 +1,63 @@
import argparse
import threading
import subprocess
import logging
import os
import signal
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
# Global list to track running processes
running_processes = []
pid_files = []
def cleanup_pid_file(pid_file):
"""Remove a PID file if it exists"""
if os.path.exists(pid_file):
try:
os.remove(pid_file)
logger.info(f"Removed PID file {pid_file}")
except Exception as e:
logger.error(f"Error removing PID file {pid_file}: {e}")
def signal_handler(sig, frame):
"""Handle Ctrl-C gracefully"""
logger.info("Received interrupt signal (Ctrl-C), shutting down gracefully...")
# Terminate all running processes
for process in running_processes:
if process and process.poll() is None: # Process is still running
try:
logger.info(f"Terminating process PID {process.pid}")
process.terminate()
# Give it a moment to terminate gracefully
try:
process.wait(timeout=5)
logger.info(f"Process PID {process.pid} terminated successfully")
except subprocess.TimeoutExpired:
logger.warning(f"Process PID {process.pid} did not terminate, forcing kill")
process.kill()
process.wait()
except Exception as e:
logger.error(f"Error terminating process PID {process.pid}: {e}")
# Clean up PID files
for pid_file in pid_files:
cleanup_pid_file(pid_file)
logger.info("Shutdown complete")
sys.exit(0)
# Run python in subprocess
def run_script(script_name, *args):
def run_script(script_name, pid_file, *args):
process = None
try:
# Path to the Python interpreter inside the virtual environment
python_executable = './env/bin/python'
@@ -11,31 +65,65 @@ def run_script(script_name, *args):
# Combine the script name and arguments
command = [python_executable, script_name] + list(args)
# Run the subprocess and report errors
subprocess.run(command, check=True)
# Run the subprocess (output goes directly to console for real-time viewing)
process = subprocess.Popen(command)
# Track the process globally
running_processes.append(process)
# Write PID to file
with open(pid_file, 'w') as f:
f.write(str(process.pid))
logger.info(f"Started {script_name} with PID {process.pid}, written to {pid_file}")
# Wait for the process to complete
process.wait()
except Exception as e:
print(f"Error running {script_name}: {e}")
logger.error(f"Error running {script_name}: {e}")
finally:
# Clean up PID file when process exits
cleanup_pid_file(pid_file)
# Parse runtime argument (--config) and start subprocess threads
def main():
parser = argparse.ArgumentParser(description="Helper script to run the datbase and web frontend in separate threads.")
# Register signal handler for Ctrl-C
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
parser = argparse.ArgumentParser(description="Helper script to run the database and web frontend in separate threads.")
# Add --config runtime argument
parser.add_argument('--config', help="Path to the configuration file.", default='config.ini')
args = parser.parse_args()
# PID file paths
db_pid_file = 'meshview-db.pid'
web_pid_file = 'meshview-web.pid'
# Track PID files globally for cleanup
pid_files.append(db_pid_file)
pid_files.append(web_pid_file)
# Database Thread
dbthrd = threading.Thread(target=run_script, args=('startdb.py', '--config', args.config))
dbthrd = threading.Thread(target=run_script, args=('startdb.py', db_pid_file, '--config', args.config))
# Web server thread
webthrd = threading.Thread(target=run_script, args=('main.py', '--config', args.config))
webthrd = threading.Thread(target=run_script, args=('main.py', web_pid_file, '--config', args.config))
# Start Meshview subprocess threads
logger.info(f"Starting Meshview with config: {args.config}")
logger.info("Starting database thread...")
dbthrd.start()
logger.info("Starting web server thread...")
webthrd.start()
dbthrd.join()
webthrd.join()
try:
dbthrd.join()
webthrd.join()
except KeyboardInterrupt:
# This shouldn't be reached due to signal handler, but just in case
signal_handler(signal.SIGINT, None)
if __name__ == '__main__':
main()
main()

View File

@@ -94,4 +94,15 @@ days_to_keep = 14
hour = 2
minute = 00
# Run VACUUM after cleanup
vacuum = False
vacuum = False
# -------------------------
# Logging Configuration
# -------------------------
[logging]
# Enable or disable HTTP access logs from the web server
# When disabled, request logs like "GET /api/chat" will not appear
# Application logs (errors, startup messages, etc.) are unaffected
# Set to True to enable, False to disable (default: False)
access_log = False