diff --git a/.gitignore b/.gitignore index 944be0a..0939dc1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/* meshview/__pycache__/* meshtastic/protobuf/* packets.db +meshview-db.pid +meshview-web.pid /table_details.py config.ini screenshots/* diff --git a/README.md b/README.md index 400f2d9..1552065 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,17 @@ hour = 2 minute = 00 # Run VACUUM after cleanup vacuum = False + + +# ------------------------- +# Logging Configuration +# ------------------------- +[logging] +# Enable or disable HTTP access logs from the web server +# When disabled, request logs like "GET /api/chat" will not appear +# Application logs (errors, startup messages, etc.) are unaffected +# Set to True to enable, False to disable (default: False) +access_log = False ``` --- @@ -401,5 +412,3 @@ Add schedule to the bottom of the file (modify /path/to/file/ to the correct pat ``` Check the log file to see it the script run at the specific time. - - diff --git a/meshview/mqtt_reader.py b/meshview/mqtt_reader.py index 859128b..e2e6b79 100644 --- a/meshview/mqtt_reader.py +++ b/meshview/mqtt_reader.py @@ -1,13 +1,23 @@ import base64 import asyncio import random +import time import aiomqtt +import logging from google.protobuf.message import DecodeError from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) + +logger = logging.getLogger(__name__) + def decrypt(packet): if packet.HasField("decoded"): @@ -27,6 +37,8 @@ def decrypt(packet): async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd): identifier = str(random.getrandbits(16)) + msg_count = 0 + start_time = None while True: try: async with aiomqtt.Client( @@ -36,10 +48,16 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa password=mqtt_passwd, identifier=identifier, ) as client: + + logger.info(f"Connected to MQTT broker at {mqtt_server}:{mqtt_port}") for topic in topics: - print(f"Subscribing to: {topic}") + logger.info(f"Subscribing to: {topic}") await client.subscribe(topic) + # Reset start time when connected + if start_time is None: + start_time = time.time() + async for msg in client.messages: try: envelope = ServiceEnvelope.FromString(msg.payload) @@ -52,11 +70,19 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa continue # Skip packets from specific node + # FIXME: make this configurable as a list of node IDs to skip if getattr(envelope.packet, "from", None) == 2144342101: continue + msg_count += 1 + # FIXME: make this interval configurable or time based + if msg_count % 10000 == 0: # Log notice every 10000 messages (approx every hour at 3/sec) + elapsed_time = time.time() - start_time + msg_rate = msg_count / elapsed_time if elapsed_time > 0 else 0 + logger.info(f"Processed {msg_count} messages so far... ({msg_rate:.2f} msg/sec)") + yield msg.topic.value, envelope except aiomqtt.MqttError as e: - print(f"MQTT error: {e}, reconnecting in 1s...") + logger.error(f"MQTT error: {e}, reconnecting in 1s...") await asyncio.sleep(1) diff --git a/meshview/web.py b/meshview/web.py index 977086c..dd5f328 100644 --- a/meshview/web.py +++ b/meshview/web.py @@ -2,6 +2,7 @@ import asyncio import datetime from datetime import timedelta import json +import logging import os import ssl from collections import Counter, defaultdict @@ -23,6 +24,14 @@ import re import traceback import pathlib +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) + +logger = logging.getLogger(__name__) + SEQ_REGEX = re.compile(r"seq \d+") SOFTWARE_RELEASE= "2.0.7 ~ 09-17-25" CONFIG = config.CONFIG @@ -439,7 +448,7 @@ async def firehose_updates(request): try: last_time = datetime.datetime.fromisoformat(last_time_str) except Exception as e: - print(f"Failed to parse last_time '{last_time_str}': {e}") + logger.error(f"Failed to parse last_time '{last_time_str}': {e}") last_time = None # Query packets after last_time (microsecond precision) @@ -462,7 +471,7 @@ async def firehose_updates(request): return web.json_response(response) except Exception as e: - print("Error in /firehose/updates:", e) + logger.error(f"Error in /firehose/updates: {e}") return web.json_response({"error": "Failed to fetch updates"}, status=500) @@ -1112,7 +1121,7 @@ async def net(request): raise # Let aiohttp handle HTTP exceptions properly except Exception as e: - print("Error processing net request") + logger.error(f"Error processing net request: {e}") template = env.get_template("error.html") rendered = template.render( error_message="An internal server error occurred.", @@ -1219,7 +1228,7 @@ async def top(request): content_type="text/html", ) except Exception as e: - print("Error in /top:", e) + logger.error(f"Error in /top: {e}") template = env.get_template("error.html") rendered = template.render( error_message="An error occurred in /top", @@ -1241,7 +1250,7 @@ async def chat(request): content_type="text/html", ) except Exception as e: - print("Error in /chat:", e) + logger.error(f"Error in /chat: {e}") template = env.get_template("error.html") rendered = template.render( error_message="An error occurred while processing your request.", @@ -1301,7 +1310,7 @@ async def nodegraph(request): edges_map[edge_pair]["weight"] += 1 edges_map[edge_pair]["type"] = "neighbor" # Overrides an existing traceroute pairing with neighbor except Exception as e: - print(f"Error decoding NeighborInfo packet: {e}") + logger.error(f"Error decoding NeighborInfo packet: {e}") # Convert edges_map to a list of dicts with colors max_weight = max(i['weight'] for i in edges_map.values()) if edges_map else 1 @@ -1390,7 +1399,7 @@ async def api_chat(request): try: since = datetime.datetime.fromisoformat(since_str) except Exception as e: - print(f"Failed to parse since '{since_str}': {e}") + logger.error(f"Failed to parse since '{since_str}': {e}") # Fetch packets from store packets = await store.get_packets( @@ -1456,7 +1465,7 @@ async def api_chat(request): }) except Exception as e: - print("Error in /api/chat:", e) + logger.error(f"Error in /api/chat: {e}") return web.json_response( {"error": "Failed to fetch chat data", "details": str(e)}, status=500 @@ -1506,7 +1515,7 @@ async def api_nodes(request): return web.json_response({"nodes": nodes_data}) except Exception as e: - print("Error in /api/nodes:", e) + logger.error(f"Error in /api/nodes: {e}") return web.json_response({"error": "Failed to fetch nodes"}, status=500) @@ -1523,7 +1532,7 @@ async def api_packets(request): try: since_time = datetime.datetime.fromisoformat(since_str) except Exception as e: - print(f"Failed to parse 'since' timestamp '{since_str}': {e}") + logger.error(f"Failed to parse 'since' timestamp '{since_str}': {e}") # Fetch last N packets packets = await store.get_packets( @@ -1545,7 +1554,7 @@ async def api_packets(request): return web.json_response({"packets": packets_json}) except Exception as e: - print("Error in /api/packets:", str(e)) + logger.error(f"Error in /api/packets: {e}") return web.json_response( {"error": "Failed to fetch packets"}, status=500 @@ -1638,7 +1647,7 @@ async def api_edges(request): try: route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route) except Exception as e: - print(f"Error decoding Traceroute {tr.id}: {e}") + logger.error(f"Error decoding Traceroute {tr.id}: {e}") continue path = [tr.packet.from_node_id] + list(route.route) @@ -1656,7 +1665,7 @@ async def api_edges(request): for node in neighbor_info.neighbors: edges.setdefault((node.node_id, packet.from_node_id), "neighbor") except Exception as e: - print(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}") + logger.error(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}") return web.json_response({ "edges": [ @@ -1687,15 +1696,27 @@ async def serve_page(request): async def run_server(): app = web.Application() app.add_routes(routes) - runner = web.AppRunner(app) + + # Check if access logging should be disabled + enable_access_log = CONFIG.get("logging", {}).get("access_log", "False").lower() == "true" + access_log_handler = None if not enable_access_log else logging.getLogger("aiohttp.access") + + runner = web.AppRunner(app, access_log=access_log_handler) await runner.setup() if CONFIG["server"]["tls_cert"]: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(CONFIG["server"]["tls_cert"]) + logger.info(f"TLS enabled with certificate: {CONFIG['server']['tls_cert']}") else: ssl_context = None + logger.info("TLS disabled") if host := CONFIG["server"]["bind"]: - site = web.TCPSite(runner, host, CONFIG["server"]["port"], ssl_context=ssl_context) + port = CONFIG["server"]["port"] + protocol = "https" if ssl_context else "http" + site = web.TCPSite(runner, host, port, ssl_context=ssl_context) await site.start() + # Display localhost instead of wildcard addresses for usability + display_host = "localhost" if host in ("0.0.0.0", "*", "::") else host + logger.info(f"Web server started at {protocol}://{display_host}:{port}") while True: await asyncio.sleep(3600) # sleep forever diff --git a/mvrun.py b/mvrun.py index f1e4f09..02f83bf 100644 --- a/mvrun.py +++ b/mvrun.py @@ -1,9 +1,63 @@ import argparse import threading import subprocess +import logging +import os +import signal +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) + +logger = logging.getLogger(__name__) + +# Global list to track running processes +running_processes = [] +pid_files = [] + +def cleanup_pid_file(pid_file): + """Remove a PID file if it exists""" + if os.path.exists(pid_file): + try: + os.remove(pid_file) + logger.info(f"Removed PID file {pid_file}") + except Exception as e: + logger.error(f"Error removing PID file {pid_file}: {e}") + +def signal_handler(sig, frame): + """Handle Ctrl-C gracefully""" + logger.info("Received interrupt signal (Ctrl-C), shutting down gracefully...") + + # Terminate all running processes + for process in running_processes: + if process and process.poll() is None: # Process is still running + try: + logger.info(f"Terminating process PID {process.pid}") + process.terminate() + # Give it a moment to terminate gracefully + try: + process.wait(timeout=5) + logger.info(f"Process PID {process.pid} terminated successfully") + except subprocess.TimeoutExpired: + logger.warning(f"Process PID {process.pid} did not terminate, forcing kill") + process.kill() + process.wait() + except Exception as e: + logger.error(f"Error terminating process PID {process.pid}: {e}") + + # Clean up PID files + for pid_file in pid_files: + cleanup_pid_file(pid_file) + + logger.info("Shutdown complete") + sys.exit(0) # Run python in subprocess -def run_script(script_name, *args): +def run_script(script_name, pid_file, *args): + process = None try: # Path to the Python interpreter inside the virtual environment python_executable = './env/bin/python' @@ -11,31 +65,65 @@ def run_script(script_name, *args): # Combine the script name and arguments command = [python_executable, script_name] + list(args) - # Run the subprocess and report errors - subprocess.run(command, check=True) + # Run the subprocess (output goes directly to console for real-time viewing) + process = subprocess.Popen(command) + + # Track the process globally + running_processes.append(process) + + # Write PID to file + with open(pid_file, 'w') as f: + f.write(str(process.pid)) + logger.info(f"Started {script_name} with PID {process.pid}, written to {pid_file}") + + # Wait for the process to complete + process.wait() + except Exception as e: - print(f"Error running {script_name}: {e}") + logger.error(f"Error running {script_name}: {e}") + finally: + # Clean up PID file when process exits + cleanup_pid_file(pid_file) # Parse runtime argument (--config) and start subprocess threads def main(): - parser = argparse.ArgumentParser(description="Helper script to run the datbase and web frontend in separate threads.") + # Register signal handler for Ctrl-C + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + parser = argparse.ArgumentParser(description="Helper script to run the database and web frontend in separate threads.") # Add --config runtime argument parser.add_argument('--config', help="Path to the configuration file.", default='config.ini') args = parser.parse_args() + # PID file paths + db_pid_file = 'meshview-db.pid' + web_pid_file = 'meshview-web.pid' + + # Track PID files globally for cleanup + pid_files.append(db_pid_file) + pid_files.append(web_pid_file) + # Database Thread - dbthrd = threading.Thread(target=run_script, args=('startdb.py', '--config', args.config)) + dbthrd = threading.Thread(target=run_script, args=('startdb.py', db_pid_file, '--config', args.config)) # Web server thread - webthrd = threading.Thread(target=run_script, args=('main.py', '--config', args.config)) + webthrd = threading.Thread(target=run_script, args=('main.py', web_pid_file, '--config', args.config)) # Start Meshview subprocess threads + logger.info(f"Starting Meshview with config: {args.config}") + logger.info("Starting database thread...") dbthrd.start() + logger.info("Starting web server thread...") webthrd.start() - dbthrd.join() - webthrd.join() + try: + dbthrd.join() + webthrd.join() + except KeyboardInterrupt: + # This shouldn't be reached due to signal handler, but just in case + signal_handler(signal.SIGINT, None) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/sample.config.ini b/sample.config.ini index cbb12c8..8da5647 100644 --- a/sample.config.ini +++ b/sample.config.ini @@ -94,4 +94,15 @@ days_to_keep = 14 hour = 2 minute = 00 # Run VACUUM after cleanup -vacuum = False \ No newline at end of file +vacuum = False + + +# ------------------------- +# Logging Configuration +# ------------------------- +[logging] +# Enable or disable HTTP access logs from the web server +# When disabled, request logs like "GET /api/chat" will not appear +# Application logs (errors, startup messages, etc.) are unaffected +# Set to True to enable, False to disable (default: False) +access_log = False