From b822c6b116bc8b336160e85b43da74c2c510bb36 Mon Sep 17 00:00:00 2001 From: eddieoz Date: Fri, 28 Nov 2025 20:12:22 +0200 Subject: [PATCH] feat: Add local node configuration validation, enhance report generation with custom filenames, and improve code quality with type hints and constants. --- .gitignore | 2 +- mesh_analyzer/active_tests.py | 35 +++++------ mesh_analyzer/analyzer.py | 90 ++++++++++++++++++----------- mesh_analyzer/config_validator.py | 76 ++++++++++++++++++++++++ mesh_analyzer/constants.py | 34 +++++++++++ mesh_analyzer/monitor.py | 96 +++++++------------------------ mesh_analyzer/reporter.py | 30 +++++++--- mesh_analyzer/utils.py | 6 +- reports/.gitignore | 4 ++ scripts/report_generate.py | 43 ++------------ 10 files changed, 238 insertions(+), 178 deletions(-) create mode 100644 mesh_analyzer/config_validator.py create mode 100644 mesh_analyzer/constants.py create mode 100644 reports/.gitignore diff --git a/.gitignore b/.gitignore index b7ce4f1..005f4dd 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,6 @@ venv.bak/ config.yaml report-*.md report-*.json -reports/ +report-*.html GEMINI.md CLAUDE.md \ No newline at end of file diff --git a/mesh_analyzer/active_tests.py b/mesh_analyzer/active_tests.py index f5e606b..1e91bd9 100644 --- a/mesh_analyzer/active_tests.py +++ b/mesh_analyzer/active_tests.py @@ -3,24 +3,26 @@ import time import threading import meshtastic.util from .utils import get_val, haversine +from . import constants logger = logging.getLogger(__name__) class ActiveTester: - def __init__(self, interface, priority_nodes=None, auto_discovery_roles=None, auto_discovery_limit=5, online_nodes=None, local_node_id=None, traceroute_timeout=60, test_interval=30, analysis_mode='distance', cluster_radius=2000): + def __init__(self, interface, priority_nodes=None, auto_discovery_roles=None, auto_discovery_limit=None, online_nodes=None, local_node_id=None, traceroute_timeout=None, test_interval=None, analysis_mode='distance', cluster_radius=None): self.interface = interface self.priority_nodes = priority_nodes if priority_nodes else [] - self.auto_discovery_roles = auto_discovery_roles if auto_discovery_roles else ['ROUTER', 'REPEATER'] - self.auto_discovery_limit = auto_discovery_limit + self.auto_discovery_roles = auto_discovery_roles if auto_discovery_roles else constants.DEFAULT_AUTO_DISCOVERY_ROLES + self.auto_discovery_limit = auto_discovery_limit if auto_discovery_limit is not None else constants.DEFAULT_AUTO_DISCOVERY_LIMIT self.online_nodes = online_nodes if online_nodes else set() self.local_node_id = local_node_id self.last_test_time = 0 - self.min_test_interval = test_interval # Seconds between active tests + self.min_test_interval = test_interval if test_interval is not None else constants.DEFAULT_TEST_INTERVAL self.current_priority_index = 0 self.pending_traceroute = None # Store ID of node we are waiting for - self.traceroute_timeout = traceroute_timeout # Seconds to wait for a response + self.traceroute_timeout = traceroute_timeout if traceroute_timeout is not None else constants.DEFAULT_TRACEROUTE_TIMEOUT self.analysis_mode = analysis_mode - self.cluster_radius = cluster_radius + self.cluster_radius = cluster_radius if cluster_radius is not None else constants.DEFAULT_CLUSTER_RADIUS + self.hop_limit = constants.DEFAULT_HOP_LIMIT # Reporting Data self.test_results = [] # List of dicts: {node_id, status, rtt, hops, snr, timestamp} @@ -30,7 +32,7 @@ class ActiveTester: # Thread safety self.lock = threading.Lock() - def run_next_test(self): + def run_next_test(self) -> None: """ Runs the next scheduled test. Prioritizes nodes in the config list. """ @@ -67,7 +69,7 @@ class ActiveTester: self.current_priority_index = (self.current_priority_index + 1) % len(self.priority_nodes) - def _auto_discover_nodes(self): + def _auto_discover_nodes(self) -> list: """ Selects nodes based on lastHeard timestamp, roles, and geolocation. Uses the existing node database instead of waiting for packets. @@ -223,7 +225,7 @@ class ActiveTester: selected_ids = [c['id'] for c in final_candidates] return selected_ids - def _get_router_cluster_nodes(self): + def _get_router_cluster_nodes(self) -> list: """ Selects nodes that are within cluster_radius of known routers. """ @@ -305,7 +307,7 @@ class ActiveTester: return selected - def send_traceroute(self, dest_node_id): + def send_traceroute(self, dest_node_id: str) -> None: """ Sends a traceroute request to the destination node. Runs in a separate thread to avoid blocking the main loop. @@ -314,7 +316,7 @@ class ActiveTester: def _send_task(): try: - self.interface.sendTraceRoute(dest_node_id, hopLimit=7) + self.interface.sendTraceRoute(dest_node_id, hopLimit=self.hop_limit) logger.debug(f"Traceroute command sent to {dest_node_id}") except Exception as e: logger.error(f"Failed to send traceroute to {dest_node_id}: {e}") @@ -405,7 +407,7 @@ class ActiveTester: self.pending_traceroute = None # Clear pending if this was the node we were waiting for self.last_test_time = time.time() # Start cooldown - def record_timeout(self, node_id): + def record_timeout(self, node_id: str) -> None: """ Records a failed test result (timeout). """ @@ -437,11 +439,4 @@ class ActiveTester: logger.info(f"Completed Test Cycle {self.completed_cycles}") self.nodes_tested_in_cycle.clear() - def flood_test(self, dest_node_id, count=5): - """ - CAUTION: Sends multiple messages to test reliability. - """ - logger.warning(f"Starting FLOOD TEST to {dest_node_id} (Count: {count})") - for i in range(count): - self.interface.sendText(f"Flood test {i+1}/{count}", destinationId=dest_node_id) - time.sleep(5) # Wait 5 seconds between messages + diff --git a/mesh_analyzer/analyzer.py b/mesh_analyzer/analyzer.py index b6330a8..57ffc5e 100644 --- a/mesh_analyzer/analyzer.py +++ b/mesh_analyzer/analyzer.py @@ -1,6 +1,7 @@ import logging import time from .utils import get_val, haversine, get_node_name +from . import constants logger = logging.getLogger(__name__) @@ -11,17 +12,17 @@ class NetworkHealthAnalyzer: # Load thresholds from config or use defaults thresholds = self.config.get('thresholds', {}) - self.ch_util_threshold = thresholds.get('channel_utilization', 25.0) - self.air_util_threshold = thresholds.get('air_util_tx', 7.0) # Updated default to 7% - self.router_density_threshold = thresholds.get('router_density_threshold', 2000) - self.active_threshold_seconds = thresholds.get('active_threshold_seconds', 7200) - self.max_nodes_long_fast = self.config.get('max_nodes_for_long_fast', 60) + self.ch_util_threshold = thresholds.get('channel_utilization', constants.DEFAULT_CHANNEL_UTILIZATION_THRESHOLD) + self.air_util_threshold = thresholds.get('air_util_tx', constants.DEFAULT_AIR_UTIL_TX_THRESHOLD) + self.router_density_threshold = thresholds.get('router_density_threshold', constants.DEFAULT_ROUTER_DENSITY_THRESHOLD) + self.active_threshold_seconds = thresholds.get('active_threshold_seconds', constants.DEFAULT_ACTIVE_THRESHOLD_SECONDS) + self.max_nodes_long_fast = self.config.get('max_nodes_for_long_fast', constants.DEFAULT_MAX_NODES_LONG_FAST) # Data storage for detailed analysis self.cluster_data = [] # Router cluster details with distances self.ch_util_data = {} # Channel utilization analysis - def analyze(self, nodes, packet_history=None, my_node=None, test_results=None): + def analyze(self, nodes: dict, packet_history: list = None, my_node: dict = None, test_results: list = None) -> list: """ Analyzes the node DB and packet history for potential issues. Returns a list of issue strings. @@ -97,7 +98,44 @@ class NetworkHealthAnalyzer: return issues - def get_router_stats(self, nodes, test_results=None): + return issues + + + def _calculate_router_distances(self, router: dict, nodes: dict, radius: float) -> tuple: + """ + Helper to calculate neighbors and nearby routers for a given router. + Returns: (total_neighbors, nearby_routers_count) + """ + nearby_routers = 0 + total_neighbors = 0 + + for node_id, node in nodes.items(): + if node_id == router['id']: continue + pos = get_val(node, 'position', {}) + lat = get_val(pos, 'latitude') + lon = get_val(pos, 'longitude') + + if lat and lon: + dist = haversine(router['lat'], router['lon'], lat, lon) + if dist < radius: + total_neighbors += 1 + # Check if it's also a router + user = get_val(node, 'user', {}) + role = get_val(user, 'role') + + is_router_role = False + if isinstance(role, int): + if role in [2, 3, 9]: # ROUTER_CLIENT, ROUTER, ROUTER_LATE + is_router_role = True + elif role in ['ROUTER', 'ROUTER_CLIENT', 'ROUTER_LATE']: + is_router_role = True + + if is_router_role: + nearby_routers += 1 + + return total_neighbors, nearby_routers + + def get_router_stats(self, nodes: dict, test_results: list = None) -> list: """ Calculates detailed statistics for each router. Returns a list of dictionaries. @@ -136,26 +174,8 @@ class NetworkHealthAnalyzer: # 2. Analyze Each Router for r in routers: # A. Neighbors (within configured radius) - nearby_routers = 0 - total_neighbors = 0 radius = self.router_density_threshold - - for node_id, node in nodes.items(): - if node_id == r['id']: continue - pos = get_val(node, 'position', {}) - lat = get_val(pos, 'latitude') - lon = get_val(pos, 'longitude') - - if lat and lon: - dist = haversine(r['lat'], r['lon'], lat, lon) - if dist < radius: - total_neighbors += 1 - # Check if it's also a router - # (Simplified check, ideally we'd check against the routers list but this is O(N)) - user = get_val(node, 'user', {}) - role = get_val(user, 'role') - if role in [2, 3, 'ROUTER', 'ROUTER_CLIENT']: - nearby_routers += 1 + total_neighbors, nearby_routers = self._calculate_router_distances(r, nodes, radius) # B. Relay Count relay_count = 0 @@ -202,7 +222,7 @@ class NetworkHealthAnalyzer: return stats - def check_router_efficiency(self, nodes, test_results=None): + def check_router_efficiency(self, nodes: dict, test_results: list = None) -> list: """ Analyzes router placement and efficiency. Returns a list of issue strings. @@ -220,7 +240,7 @@ class NetworkHealthAnalyzer: return issues - def analyze_channel_utilization(self, nodes): + def analyze_channel_utilization(self, nodes: dict) -> None: """ Analyzes channel utilization across the network. Determines if congestion is mesh-wide or isolated to specific nodes. @@ -265,7 +285,7 @@ class NetworkHealthAnalyzer: 'affected_count': len(high_util_nodes) } - def check_client_relaying_over_router(self, nodes, test_results): + def check_client_relaying_over_router(self, nodes: dict, test_results: list) -> list: """ Detects ineffective routers by checking if nearby CLIENT nodes are relaying more frequently than the router itself. @@ -370,7 +390,7 @@ class NetworkHealthAnalyzer: return issues - def check_route_quality(self, nodes, test_results): + def check_route_quality(self, nodes: dict, test_results: list) -> list: """ Analyzes the quality of routes found in traceroute tests. Checks for Hop Efficiency and Favorite Router usage. @@ -412,7 +432,7 @@ class NetworkHealthAnalyzer: return list(set(issues)) - def check_duplication(self, history, nodes): + def check_duplication(self, history: list, nodes: dict) -> list: """ Detects if the same message ID is being received multiple times. """ @@ -430,7 +450,7 @@ class NetworkHealthAnalyzer: issues.append(f"Spam: Detected {count} duplicates for Packet ID {pkt_id}. Possible routing loop or aggressive re-broadcasting.") return issues - def check_hop_counts(self, history, nodes): + def check_hop_counts(self, history: list, nodes: dict) -> list: """ Checks if packets are arriving with high hop counts. """ @@ -449,7 +469,7 @@ class NetworkHealthAnalyzer: - def check_network_size_and_preset(self, nodes): + def check_network_size_and_preset(self, nodes: dict) -> list: """ Checks if network size exceeds recommendations for the current preset. Note: We can't easily know the *current* preset of the network just from node DB, @@ -475,7 +495,7 @@ class NetworkHealthAnalyzer: return issues - def check_router_density(self, nodes, test_results=None): + def check_router_density(self, nodes: dict, test_results: list = None) -> tuple: """ Checks for high density of routers. Identifies clusters of routers within 'router_density_threshold'. @@ -575,7 +595,7 @@ class NetworkHealthAnalyzer: return issues, cluster_data - def check_signal_vs_distance(self, nodes, my_node): + def check_signal_vs_distance(self, nodes: dict, my_node: dict) -> list: """ Checks for nodes that are close but have poor SNR (indicating obstruction or antenna issues). """ diff --git a/mesh_analyzer/config_validator.py b/mesh_analyzer/config_validator.py new file mode 100644 index 0000000..2ae2dc9 --- /dev/null +++ b/mesh_analyzer/config_validator.py @@ -0,0 +1,76 @@ +""" +Configuration validator for Meshtastic node settings. + +This module validates local node configuration and provides warnings +for non-optimal settings. +""" +import logging + +logger = logging.getLogger(__name__) + + +class ConfigValidator: + """Validates Meshtastic node configuration.""" + + @staticmethod + def check_local_config(interface) -> None: + """ + Analyzes the local node's configuration and warns about non-optimal settings. + + Args: + interface: The Meshtastic interface object with access to localNode + """ + logger.info("Checking local node configuration...") + try: + # Wait a moment for node to populate if needed (though interface init usually does it) + node = None + if hasattr(interface, 'localNode'): + node = interface.localNode + + if not node: + logger.warning("Could not access local node information.") + return + + # 1. Check Role + try: + # Note: node.config might be a property of the node object + # In some versions, it's node.localConfig + if hasattr(node, 'config'): + config = node.config + elif hasattr(node, 'localConfig'): + config = node.localConfig + else: + logger.warning("Could not find config attribute on local node.") + return + + from meshtastic.protobuf import config_pb2 + role = config.device.role + role_name = config_pb2.Config.DeviceConfig.Role.Name(role) + + if role_name in ['ROUTER', 'ROUTER_CLIENT', 'REPEATER']: + logger.warning(f" [!] Local Node Role is '{role_name}'.") + logger.warning(" Recommended for monitoring: 'CLIENT' or 'CLIENT_MUTE'.") + logger.warning(" (Active monitoring works best when the monitor itself isn't a router)") + else: + logger.info(f"Local Node Role: {role_name} (OK)") + except Exception as e: + logger.warning(f"Could not verify role: {e}") + + # 2. Check Hop Limit + try: + if hasattr(node, 'config'): + config = node.config + elif hasattr(node, 'localConfig'): + config = node.localConfig + + hop_limit = config.lora.hop_limit + if hop_limit > 3: + logger.warning(f" [!] Local Node Hop Limit is {hop_limit}.") + logger.warning(" Recommended: 3. High hop limits can cause network congestion.") + else: + logger.info(f"Local Node Hop Limit: {hop_limit} (OK)") + except Exception as e: + logger.warning(f"Could not verify hop limit: {e}") + + except Exception as e: + logger.error(f"Failed to check local config: {e}") diff --git a/mesh_analyzer/constants.py b/mesh_analyzer/constants.py new file mode 100644 index 0000000..46293a5 --- /dev/null +++ b/mesh_analyzer/constants.py @@ -0,0 +1,34 @@ +""" +Constants and default values for the Meshtastic Network Monitor. + +This module centralizes all default configuration values to ensure +a single source of truth and easier maintenance. +""" + +# Thresholds +DEFAULT_CHANNEL_UTILIZATION_THRESHOLD = 25.0 # Percentage +DEFAULT_AIR_UTIL_TX_THRESHOLD = 7.0 # Percentage +DEFAULT_ROUTER_DENSITY_THRESHOLD = 2000 # Meters +DEFAULT_ACTIVE_THRESHOLD_SECONDS = 7200 # 2 hours +DEFAULT_MAX_NODES_LONG_FAST = 60 + +# Timeouts and Intervals +DEFAULT_TRACEROUTE_TIMEOUT = 60 # Seconds +DEFAULT_TEST_INTERVAL = 30 # Seconds between tests +DEFAULT_ANALYSIS_INTERVAL = 60 # Seconds between analysis runs +DEFAULT_DISCOVERY_WAIT_SECONDS = 60 # Seconds to wait during discovery + +# Active Testing +DEFAULT_HOP_LIMIT = 7 # Maximum hops for traceroute +DEFAULT_AUTO_DISCOVERY_LIMIT = 5 # Number of nodes to auto-discover +DEFAULT_AUTO_DISCOVERY_ROLES = ['ROUTER', 'REPEATER'] + +# Geospatial +DEFAULT_CLUSTER_RADIUS = 2000 # Meters for router cluster analysis + +# Reporting +DEFAULT_REPORT_CYCLES = 1 # Number of test cycles before generating report +DEFAULT_REPORT_DIR = "reports" + +# Logging +DEFAULT_LOG_LEVEL = "INFO" diff --git a/mesh_analyzer/monitor.py b/mesh_analyzer/monitor.py index 7c9b031..63ac4e3 100644 --- a/mesh_analyzer/monitor.py +++ b/mesh_analyzer/monitor.py @@ -9,6 +9,8 @@ import meshtastic.util from .analyzer import NetworkHealthAnalyzer from .active_tests import ActiveTester from .reporter import NetworkReporter +from .config_validator import ConfigValidator +from . import constants # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -26,14 +28,13 @@ class MeshMonitor: self.hostname = hostname self.config = self.load_config(config_file) self.analyzer = NetworkHealthAnalyzer(config=self.config, ignore_no_position=ignore_no_position) - self.reporter = NetworkReporter(report_dir="reports", config=self.config) + self.reporter = NetworkReporter(report_dir=self.config.get('report_dir', constants.DEFAULT_REPORT_DIR), config=self.config) self.active_tester = None self.running = False - self.config = self.load_config(config_file) self.packet_history = [] # List of recent packets for duplication check # Configure Log Level - log_level_str = self.config.get('log_level', 'info').upper() + log_level_str = self.config.get('log_level', constants.DEFAULT_LOG_LEVEL).upper() log_level = getattr(logging, log_level_str, logging.INFO) logger.setLevel(log_level) logging.getLogger().setLevel(log_level) # Set root logger too to capture lib logs if needed @@ -43,10 +44,10 @@ class MeshMonitor: # Discovery State self.discovery_mode = False self.discovery_start_time = 0 - self.discovery_wait_seconds = self.config.get('discovery_wait_seconds', 60) + self.discovery_wait_seconds = self.config.get('discovery_wait_seconds', constants.DEFAULT_DISCOVERY_WAIT_SECONDS) self.online_nodes = set() - def load_config(self, config_file): + def load_config(self, config_file: str) -> dict: if os.path.exists(config_file): try: with open(config_file, 'r') as f: @@ -55,7 +56,7 @@ class MeshMonitor: logger.error(f"Error loading config file: {e}") return {} - def start(self): + def start(self) -> None: logger.info(f"Connecting to Meshtastic node via {self.interface_type}...") try: # ... interface init ... @@ -69,7 +70,7 @@ class MeshMonitor: raise ValueError(f"Unknown interface type: {self.interface_type}") # Check local config - self.check_local_config() + ConfigValidator.check_local_config(self.interface) priority_nodes = self.config.get('priority_nodes', []) auto_discovery_roles = self.config.get('auto_discovery_roles', ['ROUTER', 'REPEATER']) @@ -178,76 +179,17 @@ class MeshMonitor: logger.error(f"Failed to connect or run: {e}") self.stop() - def check_local_config(self): - """ - Analyzes the local node's configuration and warns about non-optimal settings. - """ - logger.info("Checking local node configuration...") - try: - # Wait a moment for node to populate if needed (though interface init usually does it) - node = None - if hasattr(self.interface, 'localNode'): - node = self.interface.localNode - - if not node: - logger.warning("Could not access local node information.") - return - # 1. Check Role - # We access the protobuf config directly - try: - # Note: node.config might be a property of the node object - # In some versions, it's node.localConfig - # Let's try to access it safely - if hasattr(node, 'config'): - config = node.config - elif hasattr(node, 'localConfig'): - config = node.localConfig - else: - logger.warning("Could not find config attribute on local node.") - return - from meshtastic.protobuf import config_pb2 - role = config.device.role - role_name = config_pb2.Config.DeviceConfig.Role.Name(role) - - if role_name in ['ROUTER', 'ROUTER_CLIENT', 'REPEATER']: - logger.warning(f" [!] Local Node Role is '{role_name}'.") - logger.warning(" Recommended for monitoring: 'CLIENT' or 'CLIENT_MUTE'.") - logger.warning(" (Active monitoring works best when the monitor itself isn't a router)") - else: - logger.info(f"Local Node Role: {role_name} (OK)") - except Exception as e: - logger.warning(f"Could not verify role: {e}") - - # 2. Check Hop Limit - try: - if hasattr(node, 'config'): - config = node.config - elif hasattr(node, 'localConfig'): - config = node.localConfig - - hop_limit = config.lora.hop_limit - if hop_limit > 3: - logger.warning(f" [!] Local Node Hop Limit is {hop_limit}.") - logger.warning(" Recommended: 3. High hop limits can cause network congestion.") - else: - logger.info(f"Local Node Hop Limit: {hop_limit} (OK)") - except Exception as e: - logger.warning(f"Could not verify hop limit: {e}") - - except Exception as e: - logger.error(f"Failed to check local config: {e}") - - def stop(self): + def stop(self) -> None: self.running = False if self.interface: self.interface.close() - def on_receive(self, packet, interface): - try: - # Store packet for analysis - # We need: id, fromId, hopLimit (if available) + def on_receive(self, packet: dict, interface) -> None: + """ + Callback for received packets. + """ # We need: id, fromId, hopLimit (if available) pkt_info = { 'id': packet.get('id'), 'fromId': packet.get('fromId'), @@ -290,10 +232,16 @@ class MeshMonitor: except Exception as e: logger.error(f"Error parsing packet: {e}") - def on_connection(self, interface, topic=pub.AUTO_TOPIC): + def on_connection(self, interface, topic=pub.AUTO_TOPIC) -> None: + """ + Callback for connection established. + """ logger.info("Connection established signal received.") - def on_node_info(self, node, interface): + def on_node_info(self, nodeInfo: dict, interface) -> None: + """ + Callback for node info updates. + """ # logger.debug(f"Node info updated: {node}") pass @@ -318,7 +266,7 @@ class MeshMonitor: node['position']['longitude'] = pos['lon'] logger.debug(f"Applied manual position to {node_id}: {pos}") - def main_loop(self): + def main_loop(self) -> None: logger.info("Starting monitoring loop...") while self.running: try: diff --git a/mesh_analyzer/reporter.py b/mesh_analyzer/reporter.py index 1ae44b3..2cbe869 100644 --- a/mesh_analyzer/reporter.py +++ b/mesh_analyzer/reporter.py @@ -17,14 +17,22 @@ class NetworkReporter: # Ensure report directory exists os.makedirs(self.report_dir, exist_ok=True) - def generate_report(self, nodes, test_results, analysis_issues, local_node=None, router_stats=None, analyzer=None, override_timestamp=None, override_location=None, save_json=True): + def generate_report(self, nodes: dict, test_results: list, analysis_issues: list, local_node: dict = None, router_stats: list = None, analyzer: object = None, override_timestamp: str = None, override_location: str = None, save_json: bool = True, output_filename: str = None) -> str: """ Generates a Markdown report based on collected data. Also persists all raw data to JSON format. - analyzer: NetworkHealthAnalyzer instance with cluster_data and ch_util_data - override_timestamp: Optional timestamp string to use (for regeneration) - override_location: Optional location string to use (for regeneration) - save_json: Whether to save the raw data to JSON (default: True) + + Args: + nodes: Dictionary of nodes + test_results: List of test results + analysis_issues: List of analysis issue strings + local_node: Local node information + router_stats: Router statistics + analyzer: NetworkHealthAnalyzer instance with cluster_data and ch_util_data + override_timestamp: Optional timestamp string to use (for regeneration) + override_location: Optional location string to use (for regeneration) + save_json: Whether to save the raw data to JSON (default: True) + output_filename: Optional custom filename for the report (without extension) """ if override_timestamp: timestamp = override_timestamp @@ -32,9 +40,17 @@ class NetworkReporter: else: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") report_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # Use custom filename if provided, otherwise use timestamp-based name + if output_filename: + filename = output_filename if output_filename.endswith('.md') else f"{output_filename}.md" + # Extract base name without extension for JSON + base_name = output_filename.replace('.md', '') + json_filename = f"{base_name}.json" + else: + filename = f"report-{timestamp}.md" + json_filename = f"report-{timestamp}.json" - filename = f"report-{timestamp}.md" - json_filename = f"report-{timestamp}.json" filepath = os.path.join(self.report_dir, filename) json_filepath = os.path.join(self.report_dir, json_filename) diff --git a/mesh_analyzer/utils.py b/mesh_analyzer/utils.py index 2822867..d56744b 100644 --- a/mesh_analyzer/utils.py +++ b/mesh_analyzer/utils.py @@ -3,7 +3,7 @@ import logging logger = logging.getLogger(__name__) -def haversine(lat1, lon1, lat2, lon2): +def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate the great circle distance between two points on the earth (specified in decimal degrees). @@ -28,7 +28,7 @@ def haversine(lat1, lon1, lat2, lon2): logger.debug(f"Error calculating haversine distance: {e}") return 0 -def get_val(obj, key, default=None): +def get_val(obj: object, key: str, default: any = None) -> any: """ Safely retrieves a value from an object or dictionary. Handles nested attribute access if key contains dots (e.g. 'user.id'). @@ -50,7 +50,7 @@ def get_val(obj, key, default=None): except Exception: return default -def get_node_name(node, node_id=None): +def get_node_name(node: dict, node_id: str = None) -> str: """ Helper to get a human-readable name for a node. """ diff --git a/reports/.gitignore b/reports/.gitignore new file mode 100644 index 0000000..86d0cb2 --- /dev/null +++ b/reports/.gitignore @@ -0,0 +1,4 @@ +# Ignore everything in this directory +* +# Except this file +!.gitignore \ No newline at end of file diff --git a/scripts/report_generate.py b/scripts/report_generate.py index 0f1733f..4a00224 100755 --- a/scripts/report_generate.py +++ b/scripts/report_generate.py @@ -114,44 +114,10 @@ def generate_report_from_json(json_filepath, output_path=None): # Use new issues for the report analysis_issues = new_issues - # We need to temporarily override the filename generation if custom output is specified + # Determine output filename if provided + output_filename = None if output_path: - # Monkey-patch the generate_report to use custom filename - original_generate = reporter.generate_report - - def custom_generate(nodes, test_results, analysis_issues, local_node=None, router_stats=None, analyzer=None, override_timestamp=None, override_location=None, save_json=True): - # Temporarily change the method to use custom filename - timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") - custom_filename = os.path.basename(output_path) - filepath = os.path.join(report_dir, custom_filename) - - from mesh_analyzer.route_analyzer import RouteAnalyzer - route_analyzer = RouteAnalyzer(nodes) - route_analysis_local = route_analyzer.analyze_routes(test_results) - - try: - with open(filepath, "w") as f: - f.write(f"# Meshtastic Network Report\n") - f.write(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - f.write(f"**Regenerated from:** {os.path.basename(json_filepath)}\n\n") - - reporter._write_executive_summary(f, nodes, test_results, analysis_issues, local_node) - reporter._write_network_health(f, analysis_issues, analyzer) - - if router_stats: - reporter._write_router_performance_table(f, router_stats) - - reporter._write_route_analysis(f, route_analysis_local) - reporter._write_traceroute_results(f, test_results, nodes, local_node) - reporter._write_recommendations(f, analysis_issues, test_results, analyzer) - - print(f"✅ Report regenerated successfully: {filepath}") - return filepath - except Exception as e: - print(f"❌ Failed to generate report: {e}") - return None - - reporter.generate_report = custom_generate + output_filename = os.path.basename(output_path) # Extract session metadata # Use the 'session' variable already extracted from 'full_data' @@ -168,7 +134,8 @@ def generate_report_from_json(json_filepath, output_path=None): analyzer=analyzer, # Pass analyzer parameter override_timestamp=original_timestamp, override_location=test_location, - save_json=False # Do not overwrite JSON when regenerating + save_json=False, # Do not overwrite JSON when regenerating + output_filename=output_filename ) return result