import logging import time import os import json from datetime import datetime from .utils import get_val, haversine, get_node_name from mesh_analyzer.route_analyzer import RouteAnalyzer import io import markdown logger = logging.getLogger(__name__) class NetworkReporter: def __init__(self, report_dir="reports", config=None): self.report_dir = report_dir self.config = config or {} # Ensure report directory exists os.makedirs(self.report_dir, exist_ok=True) def generate_report(self, nodes: dict, test_results: list, analysis_issues: list, local_node: dict = None, router_stats: list = None, analyzer: object = None, override_timestamp: str = None, override_location: str = None, save_json: bool = True, output_filename: str = None) -> str: """ Generates a Markdown and/or HTML report based on collected data. Also persists all raw data to JSON format. Args: nodes: Dictionary of nodes test_results: List of test results analysis_issues: List of analysis issue strings local_node: Local node information router_stats: Router statistics analyzer: NetworkHealthAnalyzer instance with cluster_data and ch_util_data override_timestamp: Optional timestamp string to use (for regeneration) override_location: Optional location string to use (for regeneration) save_json: Whether to save the raw data to JSON (default: True) output_filename: Optional custom filename for the report (without extension) """ if override_timestamp: timestamp = override_timestamp report_date = datetime.strptime(timestamp, "%Y%m%d-%H%M%S").strftime('%Y-%m-%d %H:%M:%S') else: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") report_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Determine base filename if output_filename: base_name = output_filename.replace('.md', '').replace('.html', '') else: base_name = f"report-{timestamp}" json_filename = f"{base_name}.json" json_filepath = os.path.join(self.report_dir, json_filename) logger.info(f"Generating network report: {base_name}") # Run Route Analysis route_analyzer = RouteAnalyzer(nodes) route_analysis = route_analyzer.analyze_routes(test_results) try: # --- Generate Report Content --- # We build the markdown content in memory first f = io.StringIO() # Header f.write(f"# Meshtastic Network Report\n") f.write(f"**Date:** {report_date}\n\n") # Calculate location if not overridden if override_location: test_location = override_location else: test_location = self._get_location_string(nodes, local_node) # 1. Executive Summary self._write_executive_summary(f, nodes, test_results, analysis_issues, test_location) # 2. Network Health (Analysis Findings) self._write_network_health(f, analysis_issues, analyzer) # 2.1 Router Performance Table (New) if router_stats: self._write_router_performance_table(f, router_stats) # 3. Route Analysis (New Section) self._write_route_analysis(f, route_analysis) # 4. Traceroute Results self._write_traceroute_results(f, test_results, nodes, local_node) # 5. Recommendations self._write_recommendations(f, analysis_issues, test_results, analyzer) # Get the full markdown content markdown_content = f.getvalue() f.close() # --- Output to Files --- output_formats = self.config.get('report_output_formats', ['markdown']) generated_files = [] # 1. Markdown Output if 'markdown' in output_formats: md_filepath = os.path.join(self.report_dir, f"{base_name}.md") with open(md_filepath, "w", encoding='utf-8') as md_file: md_file.write(markdown_content) generated_files.append(md_filepath) logger.info(f"Report generated: {md_filepath}") # 2. HTML Output if 'html' in output_formats: html_filepath = os.path.join(self.report_dir, f"{base_name}.html") # Basic CSS for better readability css = """ """ html_content = markdown.markdown(markdown_content, extensions=['tables', 'fenced_code']) full_html = f"\n\n\n\nMeshtastic Network Report - {report_date}\n{css}\n\n\n{html_content}\n\n" with open(html_filepath, "w", encoding='utf-8') as html_file: html_file.write(full_html) generated_files.append(html_filepath) logger.info(f"Report generated: {html_filepath}") # --- Persist Raw Data to JSON --- if save_json: try: self._save_json_data( json_filepath, timestamp, nodes, test_results, analysis_issues, local_node, router_stats, route_analysis, test_location ) logger.info(f"Raw data saved to: {json_filepath}") except Exception as json_e: logger.error(f"Failed to save JSON data: {json_e}") # Return the primary file path (prefer markdown if available, else first generated) if generated_files: return generated_files[0] return None except Exception as e: logger.error(f"Failed to generate report: {e}") return None def _serialize_object(self, obj, visited=None): """ Recursively convert objects to JSON-serializable format. Handles protobuf objects, custom classes, and nested structures. Prevents infinite recursion from circular references. """ if visited is None: visited = set() # Check for None and primitives first (before id check) if obj is None: return None elif isinstance(obj, (str, int, float, bool)): return obj # Check for circular references using object id obj_id = id(obj) if obj_id in visited: # Return a placeholder for circular references return "" # Mark this object as visited visited.add(obj_id) try: if isinstance(obj, (list, tuple)): return [self._serialize_object(item, visited) for item in obj] elif isinstance(obj, dict): return {key: self._serialize_object(value, visited) for key, value in obj.items()} elif hasattr(obj, '__dict__'): # Convert objects with __dict__ to dictionary return self._serialize_object(obj.__dict__, visited) else: # Fallback: convert to string return str(obj) finally: # Remove from visited set when done processing this branch visited.discard(obj_id) def _save_json_data(self, filepath, timestamp, nodes, test_results, analysis_issues, local_node, router_stats, route_analysis, test_location): """ Saves all raw data to JSON file with session metadata. """ # Serialize local_node local_node_data = None if local_node: if hasattr(local_node, '__dict__'): local_node_data = self._serialize_object(local_node) elif isinstance(local_node, dict): local_node_data = self._serialize_object(local_node) else: local_node_data = str(local_node) # Build the JSON structure data = { "session": { "timestamp": timestamp, "generated_at": datetime.now().isoformat(), "test_location": test_location, "config": self._serialize_object(self.config) }, "data": { "nodes": self._serialize_object(nodes), "test_results": self._serialize_object(test_results), "analysis_issues": analysis_issues, # Already a list of strings "router_stats": self._serialize_object(router_stats), "route_analysis": self._serialize_object(route_analysis), "local_node": local_node_data } } # Write to file with pretty formatting with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, default=str) def _get_location_string(self, nodes, local_node): """ Determines the location string for the report. """ local_pos_str = "Unknown" if local_node: # Try to get ID to look up in nodes dict (which has the most up-to-date position including manual overrides) local_id = None if hasattr(local_node, 'nodeNum'): local_id = f"!{local_node.nodeNum:08x}" elif isinstance(local_node, dict): # Try to find ID in dict if 'nodeNum' in local_node: try: local_id = f"!{int(local_node['nodeNum']):08x}" except: pass if not local_id: user = local_node.get('user', {}) if 'id' in user: local_id = user['id'] if local_id and local_id in nodes: pos = get_val(nodes[local_id], 'position', {}) lat = get_val(pos, 'latitude') lon = get_val(pos, 'longitude') if lat is not None and lon is not None: local_pos_str = f"{lat:.4f}, {lon:.4f}" # Fallback to local_node object if not found in dict or no ID if local_pos_str == "Unknown": if isinstance(local_node, dict): pos = local_node.get('position', {}) lat = pos.get('latitude') lon = pos.get('longitude') if lat is not None and lon is not None: local_pos_str = f"{lat:.4f}, {lon:.4f}" elif hasattr(local_node, 'position'): # Check if it's a dict or object pos = local_node.position if isinstance(pos, dict): lat = pos.get('latitude') lon = pos.get('longitude') else: lat = getattr(pos, 'latitude', None) lon = getattr(pos, 'longitude', None) if lat is not None and lon is not None: local_pos_str = f"{lat:.4f}, {lon:.4f}" return local_pos_str def _write_executive_summary(self, f, nodes, test_results, analysis_issues, test_location="Unknown"): f.write("## 1. Executive Summary\n") total_nodes = len(nodes) total_tests = len(test_results) successful_tests = len([r for r in test_results if r.get('status') == 'success']) success_rate = (successful_tests / total_tests * 100) if total_tests > 0 else 0 critical_issues = len([i for i in analysis_issues if "Critical" in i or "Congestion" in i]) # Get unique nodes from test results (selected online nodes) unique_tested_nodes = len(set([r.get('node_id') for r in test_results])) f.write(f"- **Test Location:** {test_location}\n") f.write(f"- **Total Nodes Visible:** {total_nodes}\n") f.write(f"- **Selected Online Nodes:** {unique_tested_nodes}\n") f.write(f"- **Total Tests Performed:** {total_tests}\n") f.write(f"- **Test Success Rate:** {success_rate:.1f}%\n") f.write(f"- **Critical Issues Found:** {critical_issues}\n\n") def _write_route_analysis(self, f, analysis): f.write("## 3. Route Analysis\n") if not analysis: f.write("No route analysis data available (no successful traceroutes).\n\n") return # 3.1 Relay Usage f.write("### 3.1 Top Relays (Backbone Nodes)\n") relays = analysis.get('relay_usage', []) if relays: f.write("| Node ID | Name | Times Used as Relay |\n") f.write("|---|---|---|\n") for r in relays[:10]: # Top 10 f.write(f"| `{r['id']}` | {r['name']} | {r['count']} |\n") f.write("\n") else: f.write("No intermediate relays detected in successful traceroutes.\n\n") # 3.2 Bottlenecks f.write("### 3.2 Potential Bottlenecks (High Centrality)\n") bottlenecks = analysis.get('bottlenecks', []) if bottlenecks: f.write("Nodes that appear in routes to multiple different destinations:\n\n") f.write("| Node ID | Name | Destinations Served |\n") f.write("|---|---|---|\n") for b in bottlenecks: f.write(f"| `{b['id']}` | {b['name']} | {b['destinations_served']} |\n") f.write("\n") else: f.write("No significant bottlenecks identified.\n\n") # 3.3 Common Paths f.write("### 3.3 Most Common Paths\n") paths = analysis.get('common_paths', {}) if paths: f.write("| Destination | Most Common Path | Stability |\n") f.write("|---|---|---|\n") for dest, data in paths.items(): stability = f"{data['stability']:.1f}%" path = data['path'].replace("->", "→") f.write(f"| `{dest}` | {path} | {stability} |\n") f.write("\n") else: f.write("No path data available.\n\n") def _write_network_health(self, f, analysis_issues, analyzer=None): f.write("## 2. Network Health Analysis\n") if not analysis_issues: f.write("No significant network issues detected.\n\n") return # Helper to clean issue strings (remove recommendations) def clean_issue(issue): # Topology: High Router Density if "Best positioned seems to be" in issue: return issue.split("Best positioned seems to be")[0].strip() if "Consider changing" in issue: return issue.split("Consider changing")[0].strip() # Network Size if "If using" in issue: return issue.split("If using")[0].strip() # Efficiency if "Consolidate?" in issue: return issue.split("Consolidate?")[0].strip() return issue # Group issues by type congestion = [] config = [] topology = [] other = [] for issue in analysis_issues: # Clean the issue string first cleaned_issue = clean_issue(issue) if "Congestion" in issue or "Spam" in issue: congestion.append(cleaned_issue) elif "Config" in issue or "Role" in issue: config.append(cleaned_issue) elif "Topology" in issue or "Density" in issue or "hops away" in issue: topology.append(cleaned_issue) elif "Efficiency" in issue or "Route Quality" in issue: pass # Handled in separate sections else: other.append(cleaned_issue) if congestion: f.write("### Congestion & Airtime\n") for i in congestion: f.write(f"- {i}\n") f.write("\n") if config: f.write("### Configuration Issues\n") for i in config: f.write(f"- {i}\n") f.write("\n") if topology: f.write("### Topology & Placement\n") for i in topology: f.write(f"- {i}\n") # Add detailed cluster distance information if analyzer and hasattr(analyzer, 'cluster_data') and analyzer.cluster_data: f.write("\n**Router Cluster Details:**\n\n") for cluster in analyzer.cluster_data: f.write(f"**Cluster of {cluster['size']} routers:**\n") f.write(f" - Best positioned: {cluster['best_router']} ({cluster['best_router_relays']} relays)\n") f.write(f" - Distances:\n") for dist_info in cluster['distances']: f.write(f" - {dist_info['router1']} ↔ {dist_info['router2']}: {dist_info['distance_m']/1000:.2f}km\n") f.write("\n") f.write("\n") if other: f.write("### Other Findings\n") for i in other: f.write(f"- {i}\n") f.write("\n") # Separate section for Efficiency efficiency = [clean_issue(i) for i in analysis_issues if "Efficiency" in i] if efficiency: f.write("### Router Efficiency Analysis\n") f.write("Analysis of router placement, congestion, and relay performance.\n\n") for i in efficiency: clean_msg = i.replace("Efficiency: ", "") f.write(f"- {clean_msg}\n") f.write("\n") # Separate section for Route Quality quality = [clean_issue(i) for i in analysis_issues if "Route Quality" in i] if quality: f.write("### Route Quality Analysis\n") f.write("Analysis of path efficiency and stability.\n\n") for i in quality: clean_msg = i.replace("Route Quality: ", "") f.write(f"- {clean_msg}\n") f.write("\n") def _write_router_performance_table(self, f, router_stats): f.write("### Router Performance Table\n") if not router_stats: f.write("No routers found.\n\n") return # Get cluster_radius and router_density_threshold from config cluster_radius_m = self.config.get('cluster_radius', 3000) router_density_m = self.config.get('thresholds', {}).get('router_density_threshold', 2000) cluster_radius_km = cluster_radius_m / 1000.0 router_density_km = router_density_m / 1000.0 f.write(f"| Name | Role | Neighbors ({cluster_radius_km:.1f}km) | Routers ({router_density_km:.1f}km) | ChUtil | Relayed | Status |\n") f.write("|---|---|---|---|---|---|---|\n") for s in router_stats: # Handle backward compatibility if keys are missing neighbors = s.get('neighbors', s.get('neighbors_2km', 0)) routers_nearby = s.get('routers_nearby', s.get('routers_2km', 0)) f.write(f"| {s['name']} | {s['role']} | {neighbors} | {routers_nearby} | {s['ch_util']:.1f}% | {s['relay_count']} | {s['status']} |\n") f.write("\n") def _write_traceroute_results(self, f, test_results, nodes, local_node=None): f.write("## 3. Traceroute Results\n") if not test_results: f.write("No active tests performed in this cycle.\n\n") return f.write("| Node ID | Name | Status | Distance (km) | RTT (s) | Hops (To/Back) | SNR |\n") f.write("|---|---|---|---|---|---|---|\n") def get_distance(node_id): """Calculate distance from local node to target node in km.""" if not local_node: return '-' # Get local node ID (localNode is a Node object, not in the nodes dict directly) # We need to find the local node in the nodes dict local_node_id = None if hasattr(local_node, 'nodeNum'): # Convert node number to hex ID format local_node_id = f"!{local_node.nodeNum:08x}" if not local_node_id: return '-' # Look up local node in nodes dict to get position local_node_data = nodes.get(local_node_id) if not local_node_data: return '-' # Get local position from nodes dict local_pos = get_val(local_node_data, 'position', {}) my_lat = get_val(local_pos, 'latitude') my_lon = get_val(local_pos, 'longitude') if my_lat is None or my_lon is None: return '-' # Get target node position node = nodes.get(node_id) if not node: return '-' target_pos = get_val(node, 'position', {}) target_lat = get_val(target_pos, 'latitude') target_lon = get_val(target_pos, 'longitude') if target_lat is None or target_lon is None: return '-' # Haversine formula dist_meters = haversine(my_lat, my_lon, target_lat, target_lon) if dist_meters > 0: return f"{dist_meters/1000:.2f}" return '-' for res in test_results: node_id = res.get('node_id') node = nodes.get(node_id, {}) name = get_node_name(node, node_id) status = res.get('status', 'unknown') distance = get_distance(node_id) rtt = res.get('rtt', '-') hops_to = res.get('hops_to', '-') hops_back = res.get('hops_back', '-') snr = res.get('snr', '-') # Format RTT if isinstance(rtt, (int, float)): rtt = f"{rtt:.2f}" # Format hops if hops_to != '-' and hops_back != '-': hops = f"{hops_to}/{hops_back}" else: hops = '-' status_icon = "✅" if status == 'success' else "❌" f.write(f"| {node_id} | {name} | {status_icon} {status} | {distance} | {rtt} | {hops} | {snr} |\n") f.write("\n") def _write_recommendations(self, f, analysis_issues, test_results, analyzer=None): f.write("## 4. Recommendations\n") recs = [] # Format: (priority, emoji, text) # === CRITICAL PRIORITY === # 1. Router Clusters <500m for issue in analysis_issues: if "Topology: High Router Density!" in issue: # Extract the recommendation part if analyzer and hasattr(analyzer, 'cluster_data') and analyzer.cluster_data: # Get threshold from config (default to 2000m if not available) threshold = self.config.get('thresholds', {}).get('router_density_threshold', 2000) for cluster in analyzer.cluster_data: # Check if any distance is less than threshold (CRITICAL) min_distance = min((d['distance_m'] for d in cluster['distances']), default=threshold) has_close_routers = min_distance < threshold if has_close_routers: rec = f"**Router Cluster:** {cluster['size']} routers within {threshold/1000:.1f}km threshold (closest: {min_distance/1000:.2f}km). " rec += f"Best positioned: '{cluster['best_router']}' ({cluster['best_router_relays']} relays). " rec += f"Consider changing others ({', '.join(cluster['other_routers'])}) to CLIENT role." recs.append((1, "🔴", rec)) else: rec = f"**Router Cluster:** {cluster['size']} routers detected. " rec += f"Best positioned: '{cluster['best_router']}' ({cluster['best_router_relays']} relays). " rec += f"Review if all routers are needed: {', '.join(cluster['other_routers'])}." recs.append((2, "🟡", rec)) else: # Fallback if no analyzer data rec = "**Optimize Placement:** Routers are too close together. Convert redundant routers to clients." recs.append((1, "🔴", rec)) # 2. Channel Utilization (Mesh-Wide or Isolated) if analyzer and hasattr(analyzer, 'ch_util_data') and analyzer.ch_util_data['type'] != 'none': ch_data = analyzer.ch_util_data if ch_data['type'] == 'widespread': # CRITICAL: Mesh-wide congestion rec = f"**Mesh-Wide Congestion:** {ch_data['affected_count']} out of {ch_data['active_count']} active nodes have high channel utilization (>{self.config.get('thresholds', {}).get('channel_utilization', 25)}%). " rec += "Consider switching to a faster Meshtastic preset (e.g., LONG_FAST → MEDIUM_FAST or SHORT_FAST). " rec += "Note: Faster presets increase throughput but reduce range. Choose based on your deployment area." recs.append((1, "🔴", rec)) else: # WARNING: Isolated congestion rec = "**High Channel Utilization** on specific nodes:\n" for node in ch_data['nodes'][:5]: # Top 5 rec += f"\n - {node['name']}: {node['util_pct']:.1f}%" rec += "\n\nCheck these nodes for message spamming or reduce their broadcast frequency." recs.append((2, "🟡", rec)) elif any("Congestion" in i or "Congested" in i for i in analysis_issues): # Fallback if no analyzer data recs.append((2, "🟡", "**Reduce Traffic:** High channel utilization detected. Identify spamming nodes, reduce broadcast frequency, or optimize network preset.")) # 3. Ineffective Routers (clients relaying more than routers) ineffective_issues = [i for i in analysis_issues if "Router may be ineffective" in i] if ineffective_issues: for issue in ineffective_issues: # Parse the issue to extract router name and ChUtil import re router_match = re.search(r"Router '([^']+)' has (\d+) relays", issue) ch_util_match = re.search(r"Router ChUtil: ([\d.]+)%", issue) client_match = re.search(r"client '([^']+)' \([^)]+\) has (\d+) relays", issue) if router_match and ch_util_match and client_match: router_name = router_match.group(1) router_relays = int(router_match.group(2)) router_ch_util = float(ch_util_match.group(1)) client_name = client_match.group(1) client_relays = int(client_match.group(2)) # Detect mesh-clogging scenario: low relays + high ChUtil ch_util_threshold = self.config.get('thresholds', {}).get('channel_utilization', 25.0) is_mesh_clogger = (router_relays < client_relays / 2) and (router_ch_util > ch_util_threshold) if is_mesh_clogger: # CRITICAL: Router is clogging the mesh rec = f"**Mesh-Clogger Router:** '{router_name}' has high channel utilization ({router_ch_util:.1f}%) but low relay activity ({router_relays} relays), " rec += f"while nearby client '{client_name}' is doing more work ({client_relays} relays). " rec += f"This router is likely clogging the mesh. **Strongly recommend changing '{router_name}' to CLIENT role**." recs.append((1, "🔴", rec)) else: # WARNING: Ineffective but not clogging -> Now CRITICAL as per user request rec = f"**Ineffective Router:** '{router_name}' has {router_relays} relays (ChUtil: {router_ch_util:.1f}%), " rec += f"but nearby client '{client_name}' has {client_relays} relays. " rec += f"Consider changing '{router_name}' to CLIENT role or check its antenna/placement." recs.append((1, "🔴", rec)) else: # Fallback if parsing fails rec = issue.replace("Efficiency: ", "").replace("Router may be ineffective - check antenna, placement, or configuration.", "Consider changing the router to CLIENT role or check its antenna/placement/configuration.") recs.append((1, "🔴", rec)) # === WARNING PRIORITY === # 4. Long Paths if any("Long path" in i for i in analysis_issues): recs.append((2, "🟡", "**Optimize Paths:** Long paths (>3 hops) detected. Consider adding a strategically placed relay to shorten the path.")) # 5. Redundant Routers (not close <500m but still redundant) if any("Redundant" in i for i in analysis_issues): recs.append((2, "🟡", "**Reduce Redundancy:** Some routers have too many other routers nearby. Evaluate if all are necessary and consider changing some to CLIENT role to save airtime.")) # === INFO PRIORITY === # 6. Configuration if any("ROUTER_CLIENT" in i for i in analysis_issues): recs.append((3, "🟢", "**Fix Roles:** Deprecated `ROUTER_CLIENT` role detected. Change these nodes to `CLIENT` or `CLIENT_MUTE`.")) if any("Network Size" in i for i in analysis_issues): recs.append((3, "🟢", "**Adjust Presets:** Network size exceeds recommendations for LONG_FAST preset. Consider switching to a faster preset (e.g., LONG_MODERATE or SHORT_FAST) to reduce collision probability.")) # 7. Signal Quality if any("poor SNR" in i or "Weak signal" in i for i in analysis_issues): recs.append((3, "🟢", "**Check Hardware/LOS:** Nodes with poor SNR or weak signals may have antenna issues, bad placement, or obstructions.")) if any("Favorite Router" in i for i in analysis_issues): recs.append((3, "🟢", "**Check Favorites:** Routes are using 'Favorite Router' nodes. Ensure this is intentional, as it forces specific paths.")) # 8. Connectivity failures = [r for r in test_results if r.get('status') != 'success'] if failures: recs.append((3, "🟢", f"**Investigate Connectivity:** {len(failures)} nodes failed traceroute tests. Check if they are online or if the path is broken.")) # Sort by priority (1=CRITICAL first) recs.sort(key=lambda x: x[0]) if not recs: f.write("Network looks healthy! Keep up the good work.\n") else: for priority, emoji, rec_text in recs: f.write(f"{emoji} {rec_text}\n\n") f.write("\n")