feat: Add local node configuration validation, enhance report generation with custom filenames, and improve code quality with type hints and constants.

This commit is contained in:
eddieoz
2025-11-28 20:12:22 +02:00
parent ca612603cf
commit b822c6b116
10 changed files with 238 additions and 178 deletions

2
.gitignore vendored
View File

@@ -19,6 +19,6 @@ venv.bak/
config.yaml
report-*.md
report-*.json
reports/
report-*.html
GEMINI.md
CLAUDE.md

View File

@@ -3,24 +3,26 @@ import time
import threading
import meshtastic.util
from .utils import get_val, haversine
from . import constants
logger = logging.getLogger(__name__)
class ActiveTester:
def __init__(self, interface, priority_nodes=None, auto_discovery_roles=None, auto_discovery_limit=5, online_nodes=None, local_node_id=None, traceroute_timeout=60, test_interval=30, analysis_mode='distance', cluster_radius=2000):
def __init__(self, interface, priority_nodes=None, auto_discovery_roles=None, auto_discovery_limit=None, online_nodes=None, local_node_id=None, traceroute_timeout=None, test_interval=None, analysis_mode='distance', cluster_radius=None):
self.interface = interface
self.priority_nodes = priority_nodes if priority_nodes else []
self.auto_discovery_roles = auto_discovery_roles if auto_discovery_roles else ['ROUTER', 'REPEATER']
self.auto_discovery_limit = auto_discovery_limit
self.auto_discovery_roles = auto_discovery_roles if auto_discovery_roles else constants.DEFAULT_AUTO_DISCOVERY_ROLES
self.auto_discovery_limit = auto_discovery_limit if auto_discovery_limit is not None else constants.DEFAULT_AUTO_DISCOVERY_LIMIT
self.online_nodes = online_nodes if online_nodes else set()
self.local_node_id = local_node_id
self.last_test_time = 0
self.min_test_interval = test_interval # Seconds between active tests
self.min_test_interval = test_interval if test_interval is not None else constants.DEFAULT_TEST_INTERVAL
self.current_priority_index = 0
self.pending_traceroute = None # Store ID of node we are waiting for
self.traceroute_timeout = traceroute_timeout # Seconds to wait for a response
self.traceroute_timeout = traceroute_timeout if traceroute_timeout is not None else constants.DEFAULT_TRACEROUTE_TIMEOUT
self.analysis_mode = analysis_mode
self.cluster_radius = cluster_radius
self.cluster_radius = cluster_radius if cluster_radius is not None else constants.DEFAULT_CLUSTER_RADIUS
self.hop_limit = constants.DEFAULT_HOP_LIMIT
# Reporting Data
self.test_results = [] # List of dicts: {node_id, status, rtt, hops, snr, timestamp}
@@ -30,7 +32,7 @@ class ActiveTester:
# Thread safety
self.lock = threading.Lock()
def run_next_test(self):
def run_next_test(self) -> None:
"""
Runs the next scheduled test. Prioritizes nodes in the config list.
"""
@@ -67,7 +69,7 @@ class ActiveTester:
self.current_priority_index = (self.current_priority_index + 1) % len(self.priority_nodes)
def _auto_discover_nodes(self):
def _auto_discover_nodes(self) -> list:
"""
Selects nodes based on lastHeard timestamp, roles, and geolocation.
Uses the existing node database instead of waiting for packets.
@@ -223,7 +225,7 @@ class ActiveTester:
selected_ids = [c['id'] for c in final_candidates]
return selected_ids
def _get_router_cluster_nodes(self):
def _get_router_cluster_nodes(self) -> list:
"""
Selects nodes that are within cluster_radius of known routers.
"""
@@ -305,7 +307,7 @@ class ActiveTester:
return selected
def send_traceroute(self, dest_node_id):
def send_traceroute(self, dest_node_id: str) -> None:
"""
Sends a traceroute request to the destination node.
Runs in a separate thread to avoid blocking the main loop.
@@ -314,7 +316,7 @@ class ActiveTester:
def _send_task():
try:
self.interface.sendTraceRoute(dest_node_id, hopLimit=7)
self.interface.sendTraceRoute(dest_node_id, hopLimit=self.hop_limit)
logger.debug(f"Traceroute command sent to {dest_node_id}")
except Exception as e:
logger.error(f"Failed to send traceroute to {dest_node_id}: {e}")
@@ -405,7 +407,7 @@ class ActiveTester:
self.pending_traceroute = None # Clear pending if this was the node we were waiting for
self.last_test_time = time.time() # Start cooldown
def record_timeout(self, node_id):
def record_timeout(self, node_id: str) -> None:
"""
Records a failed test result (timeout).
"""
@@ -437,11 +439,4 @@ class ActiveTester:
logger.info(f"Completed Test Cycle {self.completed_cycles}")
self.nodes_tested_in_cycle.clear()
def flood_test(self, dest_node_id, count=5):
"""
CAUTION: Sends multiple messages to test reliability.
"""
logger.warning(f"Starting FLOOD TEST to {dest_node_id} (Count: {count})")
for i in range(count):
self.interface.sendText(f"Flood test {i+1}/{count}", destinationId=dest_node_id)
time.sleep(5) # Wait 5 seconds between messages

View File

@@ -1,6 +1,7 @@
import logging
import time
from .utils import get_val, haversine, get_node_name
from . import constants
logger = logging.getLogger(__name__)
@@ -11,17 +12,17 @@ class NetworkHealthAnalyzer:
# Load thresholds from config or use defaults
thresholds = self.config.get('thresholds', {})
self.ch_util_threshold = thresholds.get('channel_utilization', 25.0)
self.air_util_threshold = thresholds.get('air_util_tx', 7.0) # Updated default to 7%
self.router_density_threshold = thresholds.get('router_density_threshold', 2000)
self.active_threshold_seconds = thresholds.get('active_threshold_seconds', 7200)
self.max_nodes_long_fast = self.config.get('max_nodes_for_long_fast', 60)
self.ch_util_threshold = thresholds.get('channel_utilization', constants.DEFAULT_CHANNEL_UTILIZATION_THRESHOLD)
self.air_util_threshold = thresholds.get('air_util_tx', constants.DEFAULT_AIR_UTIL_TX_THRESHOLD)
self.router_density_threshold = thresholds.get('router_density_threshold', constants.DEFAULT_ROUTER_DENSITY_THRESHOLD)
self.active_threshold_seconds = thresholds.get('active_threshold_seconds', constants.DEFAULT_ACTIVE_THRESHOLD_SECONDS)
self.max_nodes_long_fast = self.config.get('max_nodes_for_long_fast', constants.DEFAULT_MAX_NODES_LONG_FAST)
# Data storage for detailed analysis
self.cluster_data = [] # Router cluster details with distances
self.ch_util_data = {} # Channel utilization analysis
def analyze(self, nodes, packet_history=None, my_node=None, test_results=None):
def analyze(self, nodes: dict, packet_history: list = None, my_node: dict = None, test_results: list = None) -> list:
"""
Analyzes the node DB and packet history for potential issues.
Returns a list of issue strings.
@@ -97,7 +98,44 @@ class NetworkHealthAnalyzer:
return issues
def get_router_stats(self, nodes, test_results=None):
return issues
def _calculate_router_distances(self, router: dict, nodes: dict, radius: float) -> tuple:
"""
Helper to calculate neighbors and nearby routers for a given router.
Returns: (total_neighbors, nearby_routers_count)
"""
nearby_routers = 0
total_neighbors = 0
for node_id, node in nodes.items():
if node_id == router['id']: continue
pos = get_val(node, 'position', {})
lat = get_val(pos, 'latitude')
lon = get_val(pos, 'longitude')
if lat and lon:
dist = haversine(router['lat'], router['lon'], lat, lon)
if dist < radius:
total_neighbors += 1
# Check if it's also a router
user = get_val(node, 'user', {})
role = get_val(user, 'role')
is_router_role = False
if isinstance(role, int):
if role in [2, 3, 9]: # ROUTER_CLIENT, ROUTER, ROUTER_LATE
is_router_role = True
elif role in ['ROUTER', 'ROUTER_CLIENT', 'ROUTER_LATE']:
is_router_role = True
if is_router_role:
nearby_routers += 1
return total_neighbors, nearby_routers
def get_router_stats(self, nodes: dict, test_results: list = None) -> list:
"""
Calculates detailed statistics for each router.
Returns a list of dictionaries.
@@ -136,26 +174,8 @@ class NetworkHealthAnalyzer:
# 2. Analyze Each Router
for r in routers:
# A. Neighbors (within configured radius)
nearby_routers = 0
total_neighbors = 0
radius = self.router_density_threshold
for node_id, node in nodes.items():
if node_id == r['id']: continue
pos = get_val(node, 'position', {})
lat = get_val(pos, 'latitude')
lon = get_val(pos, 'longitude')
if lat and lon:
dist = haversine(r['lat'], r['lon'], lat, lon)
if dist < radius:
total_neighbors += 1
# Check if it's also a router
# (Simplified check, ideally we'd check against the routers list but this is O(N))
user = get_val(node, 'user', {})
role = get_val(user, 'role')
if role in [2, 3, 'ROUTER', 'ROUTER_CLIENT']:
nearby_routers += 1
total_neighbors, nearby_routers = self._calculate_router_distances(r, nodes, radius)
# B. Relay Count
relay_count = 0
@@ -202,7 +222,7 @@ class NetworkHealthAnalyzer:
return stats
def check_router_efficiency(self, nodes, test_results=None):
def check_router_efficiency(self, nodes: dict, test_results: list = None) -> list:
"""
Analyzes router placement and efficiency.
Returns a list of issue strings.
@@ -220,7 +240,7 @@ class NetworkHealthAnalyzer:
return issues
def analyze_channel_utilization(self, nodes):
def analyze_channel_utilization(self, nodes: dict) -> None:
"""
Analyzes channel utilization across the network.
Determines if congestion is mesh-wide or isolated to specific nodes.
@@ -265,7 +285,7 @@ class NetworkHealthAnalyzer:
'affected_count': len(high_util_nodes)
}
def check_client_relaying_over_router(self, nodes, test_results):
def check_client_relaying_over_router(self, nodes: dict, test_results: list) -> list:
"""
Detects ineffective routers by checking if nearby CLIENT nodes
are relaying more frequently than the router itself.
@@ -370,7 +390,7 @@ class NetworkHealthAnalyzer:
return issues
def check_route_quality(self, nodes, test_results):
def check_route_quality(self, nodes: dict, test_results: list) -> list:
"""
Analyzes the quality of routes found in traceroute tests.
Checks for Hop Efficiency and Favorite Router usage.
@@ -412,7 +432,7 @@ class NetworkHealthAnalyzer:
return list(set(issues))
def check_duplication(self, history, nodes):
def check_duplication(self, history: list, nodes: dict) -> list:
"""
Detects if the same message ID is being received multiple times.
"""
@@ -430,7 +450,7 @@ class NetworkHealthAnalyzer:
issues.append(f"Spam: Detected {count} duplicates for Packet ID {pkt_id}. Possible routing loop or aggressive re-broadcasting.")
return issues
def check_hop_counts(self, history, nodes):
def check_hop_counts(self, history: list, nodes: dict) -> list:
"""
Checks if packets are arriving with high hop counts.
"""
@@ -449,7 +469,7 @@ class NetworkHealthAnalyzer:
def check_network_size_and_preset(self, nodes):
def check_network_size_and_preset(self, nodes: dict) -> list:
"""
Checks if network size exceeds recommendations for the current preset.
Note: We can't easily know the *current* preset of the network just from node DB,
@@ -475,7 +495,7 @@ class NetworkHealthAnalyzer:
return issues
def check_router_density(self, nodes, test_results=None):
def check_router_density(self, nodes: dict, test_results: list = None) -> tuple:
"""
Checks for high density of routers.
Identifies clusters of routers within 'router_density_threshold'.
@@ -575,7 +595,7 @@ class NetworkHealthAnalyzer:
return issues, cluster_data
def check_signal_vs_distance(self, nodes, my_node):
def check_signal_vs_distance(self, nodes: dict, my_node: dict) -> list:
"""
Checks for nodes that are close but have poor SNR (indicating obstruction or antenna issues).
"""

View File

@@ -0,0 +1,76 @@
"""
Configuration validator for Meshtastic node settings.
This module validates local node configuration and provides warnings
for non-optimal settings.
"""
import logging
logger = logging.getLogger(__name__)
class ConfigValidator:
"""Validates Meshtastic node configuration."""
@staticmethod
def check_local_config(interface) -> None:
"""
Analyzes the local node's configuration and warns about non-optimal settings.
Args:
interface: The Meshtastic interface object with access to localNode
"""
logger.info("Checking local node configuration...")
try:
# Wait a moment for node to populate if needed (though interface init usually does it)
node = None
if hasattr(interface, 'localNode'):
node = interface.localNode
if not node:
logger.warning("Could not access local node information.")
return
# 1. Check Role
try:
# Note: node.config might be a property of the node object
# In some versions, it's node.localConfig
if hasattr(node, 'config'):
config = node.config
elif hasattr(node, 'localConfig'):
config = node.localConfig
else:
logger.warning("Could not find config attribute on local node.")
return
from meshtastic.protobuf import config_pb2
role = config.device.role
role_name = config_pb2.Config.DeviceConfig.Role.Name(role)
if role_name in ['ROUTER', 'ROUTER_CLIENT', 'REPEATER']:
logger.warning(f" [!] Local Node Role is '{role_name}'.")
logger.warning(" Recommended for monitoring: 'CLIENT' or 'CLIENT_MUTE'.")
logger.warning(" (Active monitoring works best when the monitor itself isn't a router)")
else:
logger.info(f"Local Node Role: {role_name} (OK)")
except Exception as e:
logger.warning(f"Could not verify role: {e}")
# 2. Check Hop Limit
try:
if hasattr(node, 'config'):
config = node.config
elif hasattr(node, 'localConfig'):
config = node.localConfig
hop_limit = config.lora.hop_limit
if hop_limit > 3:
logger.warning(f" [!] Local Node Hop Limit is {hop_limit}.")
logger.warning(" Recommended: 3. High hop limits can cause network congestion.")
else:
logger.info(f"Local Node Hop Limit: {hop_limit} (OK)")
except Exception as e:
logger.warning(f"Could not verify hop limit: {e}")
except Exception as e:
logger.error(f"Failed to check local config: {e}")

View File

@@ -0,0 +1,34 @@
"""
Constants and default values for the Meshtastic Network Monitor.
This module centralizes all default configuration values to ensure
a single source of truth and easier maintenance.
"""
# Thresholds
DEFAULT_CHANNEL_UTILIZATION_THRESHOLD = 25.0 # Percentage
DEFAULT_AIR_UTIL_TX_THRESHOLD = 7.0 # Percentage
DEFAULT_ROUTER_DENSITY_THRESHOLD = 2000 # Meters
DEFAULT_ACTIVE_THRESHOLD_SECONDS = 7200 # 2 hours
DEFAULT_MAX_NODES_LONG_FAST = 60
# Timeouts and Intervals
DEFAULT_TRACEROUTE_TIMEOUT = 60 # Seconds
DEFAULT_TEST_INTERVAL = 30 # Seconds between tests
DEFAULT_ANALYSIS_INTERVAL = 60 # Seconds between analysis runs
DEFAULT_DISCOVERY_WAIT_SECONDS = 60 # Seconds to wait during discovery
# Active Testing
DEFAULT_HOP_LIMIT = 7 # Maximum hops for traceroute
DEFAULT_AUTO_DISCOVERY_LIMIT = 5 # Number of nodes to auto-discover
DEFAULT_AUTO_DISCOVERY_ROLES = ['ROUTER', 'REPEATER']
# Geospatial
DEFAULT_CLUSTER_RADIUS = 2000 # Meters for router cluster analysis
# Reporting
DEFAULT_REPORT_CYCLES = 1 # Number of test cycles before generating report
DEFAULT_REPORT_DIR = "reports"
# Logging
DEFAULT_LOG_LEVEL = "INFO"

View File

@@ -9,6 +9,8 @@ import meshtastic.util
from .analyzer import NetworkHealthAnalyzer
from .active_tests import ActiveTester
from .reporter import NetworkReporter
from .config_validator import ConfigValidator
from . import constants
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@@ -26,14 +28,13 @@ class MeshMonitor:
self.hostname = hostname
self.config = self.load_config(config_file)
self.analyzer = NetworkHealthAnalyzer(config=self.config, ignore_no_position=ignore_no_position)
self.reporter = NetworkReporter(report_dir="reports", config=self.config)
self.reporter = NetworkReporter(report_dir=self.config.get('report_dir', constants.DEFAULT_REPORT_DIR), config=self.config)
self.active_tester = None
self.running = False
self.config = self.load_config(config_file)
self.packet_history = [] # List of recent packets for duplication check
# Configure Log Level
log_level_str = self.config.get('log_level', 'info').upper()
log_level_str = self.config.get('log_level', constants.DEFAULT_LOG_LEVEL).upper()
log_level = getattr(logging, log_level_str, logging.INFO)
logger.setLevel(log_level)
logging.getLogger().setLevel(log_level) # Set root logger too to capture lib logs if needed
@@ -43,10 +44,10 @@ class MeshMonitor:
# Discovery State
self.discovery_mode = False
self.discovery_start_time = 0
self.discovery_wait_seconds = self.config.get('discovery_wait_seconds', 60)
self.discovery_wait_seconds = self.config.get('discovery_wait_seconds', constants.DEFAULT_DISCOVERY_WAIT_SECONDS)
self.online_nodes = set()
def load_config(self, config_file):
def load_config(self, config_file: str) -> dict:
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
@@ -55,7 +56,7 @@ class MeshMonitor:
logger.error(f"Error loading config file: {e}")
return {}
def start(self):
def start(self) -> None:
logger.info(f"Connecting to Meshtastic node via {self.interface_type}...")
try:
# ... interface init ...
@@ -69,7 +70,7 @@ class MeshMonitor:
raise ValueError(f"Unknown interface type: {self.interface_type}")
# Check local config
self.check_local_config()
ConfigValidator.check_local_config(self.interface)
priority_nodes = self.config.get('priority_nodes', [])
auto_discovery_roles = self.config.get('auto_discovery_roles', ['ROUTER', 'REPEATER'])
@@ -178,76 +179,17 @@ class MeshMonitor:
logger.error(f"Failed to connect or run: {e}")
self.stop()
def check_local_config(self):
"""
Analyzes the local node's configuration and warns about non-optimal settings.
"""
logger.info("Checking local node configuration...")
try:
# Wait a moment for node to populate if needed (though interface init usually does it)
node = None
if hasattr(self.interface, 'localNode'):
node = self.interface.localNode
if not node:
logger.warning("Could not access local node information.")
return
# 1. Check Role
# We access the protobuf config directly
try:
# Note: node.config might be a property of the node object
# In some versions, it's node.localConfig
# Let's try to access it safely
if hasattr(node, 'config'):
config = node.config
elif hasattr(node, 'localConfig'):
config = node.localConfig
else:
logger.warning("Could not find config attribute on local node.")
return
from meshtastic.protobuf import config_pb2
role = config.device.role
role_name = config_pb2.Config.DeviceConfig.Role.Name(role)
if role_name in ['ROUTER', 'ROUTER_CLIENT', 'REPEATER']:
logger.warning(f" [!] Local Node Role is '{role_name}'.")
logger.warning(" Recommended for monitoring: 'CLIENT' or 'CLIENT_MUTE'.")
logger.warning(" (Active monitoring works best when the monitor itself isn't a router)")
else:
logger.info(f"Local Node Role: {role_name} (OK)")
except Exception as e:
logger.warning(f"Could not verify role: {e}")
# 2. Check Hop Limit
try:
if hasattr(node, 'config'):
config = node.config
elif hasattr(node, 'localConfig'):
config = node.localConfig
hop_limit = config.lora.hop_limit
if hop_limit > 3:
logger.warning(f" [!] Local Node Hop Limit is {hop_limit}.")
logger.warning(" Recommended: 3. High hop limits can cause network congestion.")
else:
logger.info(f"Local Node Hop Limit: {hop_limit} (OK)")
except Exception as e:
logger.warning(f"Could not verify hop limit: {e}")
except Exception as e:
logger.error(f"Failed to check local config: {e}")
def stop(self):
def stop(self) -> None:
self.running = False
if self.interface:
self.interface.close()
def on_receive(self, packet, interface):
try:
# Store packet for analysis
# We need: id, fromId, hopLimit (if available)
def on_receive(self, packet: dict, interface) -> None:
"""
Callback for received packets.
""" # We need: id, fromId, hopLimit (if available)
pkt_info = {
'id': packet.get('id'),
'fromId': packet.get('fromId'),
@@ -290,10 +232,16 @@ class MeshMonitor:
except Exception as e:
logger.error(f"Error parsing packet: {e}")
def on_connection(self, interface, topic=pub.AUTO_TOPIC):
def on_connection(self, interface, topic=pub.AUTO_TOPIC) -> None:
"""
Callback for connection established.
"""
logger.info("Connection established signal received.")
def on_node_info(self, node, interface):
def on_node_info(self, nodeInfo: dict, interface) -> None:
"""
Callback for node info updates.
"""
# logger.debug(f"Node info updated: {node}")
pass
@@ -318,7 +266,7 @@ class MeshMonitor:
node['position']['longitude'] = pos['lon']
logger.debug(f"Applied manual position to {node_id}: {pos}")
def main_loop(self):
def main_loop(self) -> None:
logger.info("Starting monitoring loop...")
while self.running:
try:

View File

@@ -17,14 +17,22 @@ class NetworkReporter:
# Ensure report directory exists
os.makedirs(self.report_dir, exist_ok=True)
def generate_report(self, nodes, test_results, analysis_issues, local_node=None, router_stats=None, analyzer=None, override_timestamp=None, override_location=None, save_json=True):
def generate_report(self, nodes: dict, test_results: list, analysis_issues: list, local_node: dict = None, router_stats: list = None, analyzer: object = None, override_timestamp: str = None, override_location: str = None, save_json: bool = True, output_filename: str = None) -> str:
"""
Generates a Markdown report based on collected data.
Also persists all raw data to JSON format.
analyzer: NetworkHealthAnalyzer instance with cluster_data and ch_util_data
override_timestamp: Optional timestamp string to use (for regeneration)
override_location: Optional location string to use (for regeneration)
save_json: Whether to save the raw data to JSON (default: True)
Args:
nodes: Dictionary of nodes
test_results: List of test results
analysis_issues: List of analysis issue strings
local_node: Local node information
router_stats: Router statistics
analyzer: NetworkHealthAnalyzer instance with cluster_data and ch_util_data
override_timestamp: Optional timestamp string to use (for regeneration)
override_location: Optional location string to use (for regeneration)
save_json: Whether to save the raw data to JSON (default: True)
output_filename: Optional custom filename for the report (without extension)
"""
if override_timestamp:
timestamp = override_timestamp
@@ -32,9 +40,17 @@ class NetworkReporter:
else:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
report_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Use custom filename if provided, otherwise use timestamp-based name
if output_filename:
filename = output_filename if output_filename.endswith('.md') else f"{output_filename}.md"
# Extract base name without extension for JSON
base_name = output_filename.replace('.md', '')
json_filename = f"{base_name}.json"
else:
filename = f"report-{timestamp}.md"
json_filename = f"report-{timestamp}.json"
filename = f"report-{timestamp}.md"
json_filename = f"report-{timestamp}.json"
filepath = os.path.join(self.report_dir, filename)
json_filepath = os.path.join(self.report_dir, json_filename)

View File

@@ -3,7 +3,7 @@ import logging
logger = logging.getLogger(__name__)
def haversine(lat1, lon1, lat2, lon2):
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees).
@@ -28,7 +28,7 @@ def haversine(lat1, lon1, lat2, lon2):
logger.debug(f"Error calculating haversine distance: {e}")
return 0
def get_val(obj, key, default=None):
def get_val(obj: object, key: str, default: any = None) -> any:
"""
Safely retrieves a value from an object or dictionary.
Handles nested attribute access if key contains dots (e.g. 'user.id').
@@ -50,7 +50,7 @@ def get_val(obj, key, default=None):
except Exception:
return default
def get_node_name(node, node_id=None):
def get_node_name(node: dict, node_id: str = None) -> str:
"""
Helper to get a human-readable name for a node.
"""

4
reports/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

View File

@@ -114,44 +114,10 @@ def generate_report_from_json(json_filepath, output_path=None):
# Use new issues for the report
analysis_issues = new_issues
# We need to temporarily override the filename generation if custom output is specified
# Determine output filename if provided
output_filename = None
if output_path:
# Monkey-patch the generate_report to use custom filename
original_generate = reporter.generate_report
def custom_generate(nodes, test_results, analysis_issues, local_node=None, router_stats=None, analyzer=None, override_timestamp=None, override_location=None, save_json=True):
# Temporarily change the method to use custom filename
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
custom_filename = os.path.basename(output_path)
filepath = os.path.join(report_dir, custom_filename)
from mesh_analyzer.route_analyzer import RouteAnalyzer
route_analyzer = RouteAnalyzer(nodes)
route_analysis_local = route_analyzer.analyze_routes(test_results)
try:
with open(filepath, "w") as f:
f.write(f"# Meshtastic Network Report\n")
f.write(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"**Regenerated from:** {os.path.basename(json_filepath)}\n\n")
reporter._write_executive_summary(f, nodes, test_results, analysis_issues, local_node)
reporter._write_network_health(f, analysis_issues, analyzer)
if router_stats:
reporter._write_router_performance_table(f, router_stats)
reporter._write_route_analysis(f, route_analysis_local)
reporter._write_traceroute_results(f, test_results, nodes, local_node)
reporter._write_recommendations(f, analysis_issues, test_results, analyzer)
print(f"✅ Report regenerated successfully: {filepath}")
return filepath
except Exception as e:
print(f"❌ Failed to generate report: {e}")
return None
reporter.generate_report = custom_generate
output_filename = os.path.basename(output_path)
# Extract session metadata
# Use the 'session' variable already extracted from 'full_data'
@@ -168,7 +134,8 @@ def generate_report_from_json(json_filepath, output_path=None):
analyzer=analyzer, # Pass analyzer parameter
override_timestamp=original_timestamp,
override_location=test_location,
save_json=False # Do not overwrite JSON when regenerating
save_json=False, # Do not overwrite JSON when regenerating
output_filename=output_filename
)
return result