mirror of
https://github.com/pablorevilla-meshtastic/meshview.git
synced 2026-03-04 23:27:46 +01:00
Add structured logging and improved startup/shutdown handling
- Add consistent logging format across all modules (timestamp, file:line, PID, level) - Add startup logging for MQTT connection, web server startup with URL display - Add MQTT message processing metrics (count and rate logging every 10k messages) - Add graceful shutdown handling with signal handlers and PID file cleanup - Add configurable HTTP access log toggle via config.ini (default: disabled) - Replace print() statements with proper logger calls throughout - Update .gitignore to exclude PID files (meshview-db.pid, meshview-web.pid) - Update documentation for new logging configuration options 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -3,6 +3,8 @@ __pycache__/*
|
||||
meshview/__pycache__/*
|
||||
meshtastic/protobuf/*
|
||||
packets.db
|
||||
meshview-db.pid
|
||||
meshview-web.pid
|
||||
/table_details.py
|
||||
config.ini
|
||||
screenshots/*
|
||||
|
||||
13
README.md
13
README.md
@@ -210,6 +210,17 @@ hour = 2
|
||||
minute = 00
|
||||
# Run VACUUM after cleanup
|
||||
vacuum = False
|
||||
|
||||
|
||||
# -------------------------
|
||||
# Logging Configuration
|
||||
# -------------------------
|
||||
[logging]
|
||||
# Enable or disable HTTP access logs from the web server
|
||||
# When disabled, request logs like "GET /api/chat" will not appear
|
||||
# Application logs (errors, startup messages, etc.) are unaffected
|
||||
# Set to True to enable, False to disable (default: False)
|
||||
access_log = False
|
||||
```
|
||||
|
||||
---
|
||||
@@ -402,5 +413,3 @@ Add schedule to the bottom of the file (modify /path/to/file/ to the correct pat
|
||||
```
|
||||
|
||||
Check the log file to see it the script run at the specific time.
|
||||
|
||||
|
||||
|
||||
@@ -1,13 +1,23 @@
|
||||
import base64
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
import aiomqtt
|
||||
import logging
|
||||
from google.protobuf.message import DecodeError
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
|
||||
|
||||
KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==")
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def decrypt(packet):
|
||||
if packet.HasField("decoded"):
|
||||
@@ -27,6 +37,8 @@ def decrypt(packet):
|
||||
|
||||
async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd):
|
||||
identifier = str(random.getrandbits(16))
|
||||
msg_count = 0
|
||||
start_time = None
|
||||
while True:
|
||||
try:
|
||||
async with aiomqtt.Client(
|
||||
@@ -36,10 +48,16 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa
|
||||
password=mqtt_passwd,
|
||||
identifier=identifier,
|
||||
) as client:
|
||||
|
||||
logger.info(f"Connected to MQTT broker at {mqtt_server}:{mqtt_port}")
|
||||
for topic in topics:
|
||||
print(f"Subscribing to: {topic}")
|
||||
logger.info(f"Subscribing to: {topic}")
|
||||
await client.subscribe(topic)
|
||||
|
||||
# Reset start time when connected
|
||||
if start_time is None:
|
||||
start_time = time.time()
|
||||
|
||||
async for msg in client.messages:
|
||||
try:
|
||||
envelope = ServiceEnvelope.FromString(msg.payload)
|
||||
@@ -52,11 +70,19 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa
|
||||
continue
|
||||
|
||||
# Skip packets from specific node
|
||||
# FIXME: make this configurable as a list of node IDs to skip
|
||||
if getattr(envelope.packet, "from", None) == 2144342101:
|
||||
continue
|
||||
|
||||
msg_count += 1
|
||||
# FIXME: make this interval configurable or time based
|
||||
if msg_count % 10000 == 0: # Log notice every 10000 messages (approx every hour at 3/sec)
|
||||
elapsed_time = time.time() - start_time
|
||||
msg_rate = msg_count / elapsed_time if elapsed_time > 0 else 0
|
||||
logger.info(f"Processed {msg_count} messages so far... ({msg_rate:.2f} msg/sec)")
|
||||
|
||||
yield msg.topic.value, envelope
|
||||
|
||||
except aiomqtt.MqttError as e:
|
||||
print(f"MQTT error: {e}, reconnecting in 1s...")
|
||||
logger.error(f"MQTT error: {e}, reconnecting in 1s...")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@@ -2,6 +2,7 @@ import asyncio
|
||||
import datetime
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import ssl
|
||||
from collections import Counter, defaultdict
|
||||
@@ -23,6 +24,14 @@ import re
|
||||
import traceback
|
||||
import pathlib
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SEQ_REGEX = re.compile(r"seq \d+")
|
||||
SOFTWARE_RELEASE= "2.0.7 ~ 09-17-25"
|
||||
CONFIG = config.CONFIG
|
||||
@@ -439,7 +448,7 @@ async def firehose_updates(request):
|
||||
try:
|
||||
last_time = datetime.datetime.fromisoformat(last_time_str)
|
||||
except Exception as e:
|
||||
print(f"Failed to parse last_time '{last_time_str}': {e}")
|
||||
logger.error(f"Failed to parse last_time '{last_time_str}': {e}")
|
||||
last_time = None
|
||||
|
||||
# Query packets after last_time (microsecond precision)
|
||||
@@ -462,7 +471,7 @@ async def firehose_updates(request):
|
||||
return web.json_response(response)
|
||||
|
||||
except Exception as e:
|
||||
print("Error in /firehose/updates:", e)
|
||||
logger.error(f"Error in /firehose/updates: {e}")
|
||||
return web.json_response({"error": "Failed to fetch updates"}, status=500)
|
||||
|
||||
|
||||
@@ -1112,7 +1121,7 @@ async def net(request):
|
||||
raise # Let aiohttp handle HTTP exceptions properly
|
||||
|
||||
except Exception as e:
|
||||
print("Error processing net request")
|
||||
logger.error(f"Error processing net request: {e}")
|
||||
template = env.get_template("error.html")
|
||||
rendered = template.render(
|
||||
error_message="An internal server error occurred.",
|
||||
@@ -1219,7 +1228,7 @@ async def top(request):
|
||||
content_type="text/html",
|
||||
)
|
||||
except Exception as e:
|
||||
print("Error in /top:", e)
|
||||
logger.error(f"Error in /top: {e}")
|
||||
template = env.get_template("error.html")
|
||||
rendered = template.render(
|
||||
error_message="An error occurred in /top",
|
||||
@@ -1241,7 +1250,7 @@ async def chat(request):
|
||||
content_type="text/html",
|
||||
)
|
||||
except Exception as e:
|
||||
print("Error in /chat:", e)
|
||||
logger.error(f"Error in /chat: {e}")
|
||||
template = env.get_template("error.html")
|
||||
rendered = template.render(
|
||||
error_message="An error occurred while processing your request.",
|
||||
@@ -1301,7 +1310,7 @@ async def nodegraph(request):
|
||||
edges_map[edge_pair]["weight"] += 1
|
||||
edges_map[edge_pair]["type"] = "neighbor" # Overrides an existing traceroute pairing with neighbor
|
||||
except Exception as e:
|
||||
print(f"Error decoding NeighborInfo packet: {e}")
|
||||
logger.error(f"Error decoding NeighborInfo packet: {e}")
|
||||
|
||||
# Convert edges_map to a list of dicts with colors
|
||||
max_weight = max(i['weight'] for i in edges_map.values()) if edges_map else 1
|
||||
@@ -1390,7 +1399,7 @@ async def api_chat(request):
|
||||
try:
|
||||
since = datetime.datetime.fromisoformat(since_str)
|
||||
except Exception as e:
|
||||
print(f"Failed to parse since '{since_str}': {e}")
|
||||
logger.error(f"Failed to parse since '{since_str}': {e}")
|
||||
|
||||
# Fetch packets from store
|
||||
packets = await store.get_packets(
|
||||
@@ -1456,7 +1465,7 @@ async def api_chat(request):
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print("Error in /api/chat:", e)
|
||||
logger.error(f"Error in /api/chat: {e}")
|
||||
return web.json_response(
|
||||
{"error": "Failed to fetch chat data", "details": str(e)},
|
||||
status=500
|
||||
@@ -1506,7 +1515,7 @@ async def api_nodes(request):
|
||||
return web.json_response({"nodes": nodes_data})
|
||||
|
||||
except Exception as e:
|
||||
print("Error in /api/nodes:", e)
|
||||
logger.error(f"Error in /api/nodes: {e}")
|
||||
return web.json_response({"error": "Failed to fetch nodes"}, status=500)
|
||||
|
||||
|
||||
@@ -1523,7 +1532,7 @@ async def api_packets(request):
|
||||
try:
|
||||
since_time = datetime.datetime.fromisoformat(since_str)
|
||||
except Exception as e:
|
||||
print(f"Failed to parse 'since' timestamp '{since_str}': {e}")
|
||||
logger.error(f"Failed to parse 'since' timestamp '{since_str}': {e}")
|
||||
|
||||
# Fetch last N packets
|
||||
packets = await store.get_packets(
|
||||
@@ -1545,7 +1554,7 @@ async def api_packets(request):
|
||||
return web.json_response({"packets": packets_json})
|
||||
|
||||
except Exception as e:
|
||||
print("Error in /api/packets:", str(e))
|
||||
logger.error(f"Error in /api/packets: {e}")
|
||||
return web.json_response(
|
||||
{"error": "Failed to fetch packets"},
|
||||
status=500
|
||||
@@ -1638,7 +1647,7 @@ async def api_edges(request):
|
||||
try:
|
||||
route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
|
||||
except Exception as e:
|
||||
print(f"Error decoding Traceroute {tr.id}: {e}")
|
||||
logger.error(f"Error decoding Traceroute {tr.id}: {e}")
|
||||
continue
|
||||
|
||||
path = [tr.packet.from_node_id] + list(route.route)
|
||||
@@ -1656,7 +1665,7 @@ async def api_edges(request):
|
||||
for node in neighbor_info.neighbors:
|
||||
edges.setdefault((node.node_id, packet.from_node_id), "neighbor")
|
||||
except Exception as e:
|
||||
print(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}")
|
||||
logger.error(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}")
|
||||
|
||||
return web.json_response({
|
||||
"edges": [
|
||||
@@ -1687,15 +1696,27 @@ async def serve_page(request):
|
||||
async def run_server():
|
||||
app = web.Application()
|
||||
app.add_routes(routes)
|
||||
runner = web.AppRunner(app)
|
||||
|
||||
# Check if access logging should be disabled
|
||||
enable_access_log = CONFIG.get("logging", {}).get("access_log", "False").lower() == "true"
|
||||
access_log_handler = None if not enable_access_log else logging.getLogger("aiohttp.access")
|
||||
|
||||
runner = web.AppRunner(app, access_log=access_log_handler)
|
||||
await runner.setup()
|
||||
if CONFIG["server"]["tls_cert"]:
|
||||
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
ssl_context.load_cert_chain(CONFIG["server"]["tls_cert"])
|
||||
logger.info(f"TLS enabled with certificate: {CONFIG['server']['tls_cert']}")
|
||||
else:
|
||||
ssl_context = None
|
||||
logger.info("TLS disabled")
|
||||
if host := CONFIG["server"]["bind"]:
|
||||
site = web.TCPSite(runner, host, CONFIG["server"]["port"], ssl_context=ssl_context)
|
||||
port = CONFIG["server"]["port"]
|
||||
protocol = "https" if ssl_context else "http"
|
||||
site = web.TCPSite(runner, host, port, ssl_context=ssl_context)
|
||||
await site.start()
|
||||
# Display localhost instead of wildcard addresses for usability
|
||||
display_host = "localhost" if host in ("0.0.0.0", "*", "::") else host
|
||||
logger.info(f"Web server started at {protocol}://{display_host}:{port}")
|
||||
while True:
|
||||
await asyncio.sleep(3600) # sleep forever
|
||||
|
||||
108
mvrun.py
108
mvrun.py
@@ -1,9 +1,63 @@
|
||||
import argparse
|
||||
import threading
|
||||
import subprocess
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global list to track running processes
|
||||
running_processes = []
|
||||
pid_files = []
|
||||
|
||||
def cleanup_pid_file(pid_file):
|
||||
"""Remove a PID file if it exists"""
|
||||
if os.path.exists(pid_file):
|
||||
try:
|
||||
os.remove(pid_file)
|
||||
logger.info(f"Removed PID file {pid_file}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error removing PID file {pid_file}: {e}")
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
"""Handle Ctrl-C gracefully"""
|
||||
logger.info("Received interrupt signal (Ctrl-C), shutting down gracefully...")
|
||||
|
||||
# Terminate all running processes
|
||||
for process in running_processes:
|
||||
if process and process.poll() is None: # Process is still running
|
||||
try:
|
||||
logger.info(f"Terminating process PID {process.pid}")
|
||||
process.terminate()
|
||||
# Give it a moment to terminate gracefully
|
||||
try:
|
||||
process.wait(timeout=5)
|
||||
logger.info(f"Process PID {process.pid} terminated successfully")
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"Process PID {process.pid} did not terminate, forcing kill")
|
||||
process.kill()
|
||||
process.wait()
|
||||
except Exception as e:
|
||||
logger.error(f"Error terminating process PID {process.pid}: {e}")
|
||||
|
||||
# Clean up PID files
|
||||
for pid_file in pid_files:
|
||||
cleanup_pid_file(pid_file)
|
||||
|
||||
logger.info("Shutdown complete")
|
||||
sys.exit(0)
|
||||
|
||||
# Run python in subprocess
|
||||
def run_script(script_name, *args):
|
||||
def run_script(script_name, pid_file, *args):
|
||||
process = None
|
||||
try:
|
||||
# Path to the Python interpreter inside the virtual environment
|
||||
python_executable = './env/bin/python'
|
||||
@@ -11,31 +65,65 @@ def run_script(script_name, *args):
|
||||
# Combine the script name and arguments
|
||||
command = [python_executable, script_name] + list(args)
|
||||
|
||||
# Run the subprocess and report errors
|
||||
subprocess.run(command, check=True)
|
||||
# Run the subprocess (output goes directly to console for real-time viewing)
|
||||
process = subprocess.Popen(command)
|
||||
|
||||
# Track the process globally
|
||||
running_processes.append(process)
|
||||
|
||||
# Write PID to file
|
||||
with open(pid_file, 'w') as f:
|
||||
f.write(str(process.pid))
|
||||
logger.info(f"Started {script_name} with PID {process.pid}, written to {pid_file}")
|
||||
|
||||
# Wait for the process to complete
|
||||
process.wait()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error running {script_name}: {e}")
|
||||
logger.error(f"Error running {script_name}: {e}")
|
||||
finally:
|
||||
# Clean up PID file when process exits
|
||||
cleanup_pid_file(pid_file)
|
||||
|
||||
# Parse runtime argument (--config) and start subprocess threads
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Helper script to run the datbase and web frontend in separate threads.")
|
||||
# Register signal handler for Ctrl-C
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
parser = argparse.ArgumentParser(description="Helper script to run the database and web frontend in separate threads.")
|
||||
|
||||
# Add --config runtime argument
|
||||
parser.add_argument('--config', help="Path to the configuration file.", default='config.ini')
|
||||
args = parser.parse_args()
|
||||
|
||||
# PID file paths
|
||||
db_pid_file = 'meshview-db.pid'
|
||||
web_pid_file = 'meshview-web.pid'
|
||||
|
||||
# Track PID files globally for cleanup
|
||||
pid_files.append(db_pid_file)
|
||||
pid_files.append(web_pid_file)
|
||||
|
||||
# Database Thread
|
||||
dbthrd = threading.Thread(target=run_script, args=('startdb.py', '--config', args.config))
|
||||
dbthrd = threading.Thread(target=run_script, args=('startdb.py', db_pid_file, '--config', args.config))
|
||||
|
||||
# Web server thread
|
||||
webthrd = threading.Thread(target=run_script, args=('main.py', '--config', args.config))
|
||||
webthrd = threading.Thread(target=run_script, args=('main.py', web_pid_file, '--config', args.config))
|
||||
|
||||
# Start Meshview subprocess threads
|
||||
logger.info(f"Starting Meshview with config: {args.config}")
|
||||
logger.info("Starting database thread...")
|
||||
dbthrd.start()
|
||||
logger.info("Starting web server thread...")
|
||||
webthrd.start()
|
||||
|
||||
dbthrd.join()
|
||||
webthrd.join()
|
||||
try:
|
||||
dbthrd.join()
|
||||
webthrd.join()
|
||||
except KeyboardInterrupt:
|
||||
# This shouldn't be reached due to signal handler, but just in case
|
||||
signal_handler(signal.SIGINT, None)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
main()
|
||||
|
||||
@@ -94,4 +94,15 @@ days_to_keep = 14
|
||||
hour = 2
|
||||
minute = 00
|
||||
# Run VACUUM after cleanup
|
||||
vacuum = False
|
||||
vacuum = False
|
||||
|
||||
|
||||
# -------------------------
|
||||
# Logging Configuration
|
||||
# -------------------------
|
||||
[logging]
|
||||
# Enable or disable HTTP access logs from the web server
|
||||
# When disabled, request logs like "GET /api/chat" will not appear
|
||||
# Application logs (errors, startup messages, etc.) are unaffected
|
||||
# Set to True to enable, False to disable (default: False)
|
||||
access_log = False
|
||||
|
||||
Reference in New Issue
Block a user