feat: Refactor report generation to include JSON persistence and regeneration, supported by new test scripts.

This commit is contained in:
eddieoz
2025-11-28 15:32:51 +02:00
parent bf7412aa2a
commit ccf0e6fc7c
5 changed files with 549 additions and 12 deletions

View File

@@ -202,11 +202,11 @@ class NetworkHealthAnalyzer:
for s in stats:
if "Redundant" in s['status']:
issues.append(f"Efficiency: Router '{s['name']}' is Redundant. Has {s['routers_2km']} other routers within 2km. Consolidate?")
issues.append(f"Efficiency: Router '{s['name']}' is Redundant. Has {s['routers_nearby']} other routers within {s['radius']/1000:.1f}km. Consolidate?")
if "Congested" in s['status']:
issues.append(f"Efficiency: Router '{s['name']}' is Congested (ChUtil {s['ch_util']:.1f}% > 20%).")
if "Ineffective" in s['status']:
issues.append(f"Efficiency: Router '{s['name']}' is Ineffective. Has {s['neighbors_2km']} neighbors but relayed 0 packets in tests.")
issues.append(f"Efficiency: Router '{s['name']}' is Ineffective. Has {s['neighbors']} neighbors but relayed 0 packets in tests.")
return issues
@@ -413,9 +413,9 @@ class NetworkHealthAnalyzer:
# 3. Analyze Clusters and Generate Recommendations
for cluster in clusters:
# Sort by relay_count (desc), then neighbors_2km (desc)
# Sort by relay_count (desc), then neighbors (desc)
# We want the "best" router first
cluster.sort(key=lambda x: (x['relay_count'], x['neighbors_2km']), reverse=True)
cluster.sort(key=lambda x: (x['relay_count'], x['neighbors']), reverse=True)
best_router = cluster[0]
others = cluster[1:]

View File

@@ -26,7 +26,7 @@ class MeshMonitor:
self.hostname = hostname
self.config = self.load_config(config_file)
self.analyzer = NetworkHealthAnalyzer(config=self.config, ignore_no_position=ignore_no_position)
self.reporter = NetworkReporter()
self.reporter = NetworkReporter(report_dir="reports", config=self.config)
self.active_tester = None
self.running = False
self.config = self.load_config(config_file)
@@ -353,8 +353,8 @@ class MeshMonitor:
# Report Issues
if issues:
logger.warning(f"Found {len(issues)} potential issues:")
for issue in issues:
logger.warning(f" - {issue}")
# for issue in issues:
# logger.warning(f" - {issue}")
else:
logger.debug("No critical issues found in current scan.")

View File

@@ -1,6 +1,7 @@
import logging
import time
import os
import json
from datetime import datetime
from .utils import get_val, haversine, get_node_name
@@ -9,16 +10,23 @@ from mesh_monitor.route_analyzer import RouteAnalyzer
logger = logging.getLogger(__name__)
class NetworkReporter:
def __init__(self, report_dir="."):
def __init__(self, report_dir="reports", config=None):
self.report_dir = report_dir
self.config = config or {}
# Ensure report directory exists
os.makedirs(self.report_dir, exist_ok=True)
def generate_report(self, nodes, test_results, analysis_issues, local_node=None, router_stats=None):
"""
Generates a Markdown report based on collected data.
Also persists all raw data to JSON format.
"""
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
filename = f"report-{timestamp}.md"
json_filename = f"report-{timestamp}.json"
filepath = os.path.join(self.report_dir, filename)
json_filepath = os.path.join(self.report_dir, json_filename)
logger.info(f"Generating network report: {filepath}")
@@ -27,6 +35,7 @@ class NetworkReporter:
route_analysis = route_analyzer.analyze_routes(test_results)
try:
# --- Generate Markdown Report ---
with open(filepath, "w") as f:
# Header
f.write(f"# Meshtastic Network Report\n")
@@ -52,11 +61,103 @@ class NetworkReporter:
self._write_recommendations(f, analysis_issues, test_results)
logger.info(f"Report generated successfully: {filepath}")
# --- Persist Raw Data to JSON ---
try:
self._save_json_data(
json_filepath,
timestamp,
nodes,
test_results,
analysis_issues,
local_node,
router_stats,
route_analysis
)
logger.info(f"Raw data saved to: {json_filepath}")
except Exception as json_e:
logger.error(f"Failed to save JSON data: {json_e}")
return filepath
except Exception as e:
logger.error(f"Failed to generate report: {e}")
return None
def _serialize_object(self, obj, visited=None):
"""
Recursively convert objects to JSON-serializable format.
Handles protobuf objects, custom classes, and nested structures.
Prevents infinite recursion from circular references.
"""
if visited is None:
visited = set()
# Check for None and primitives first (before id check)
if obj is None:
return None
elif isinstance(obj, (str, int, float, bool)):
return obj
# Check for circular references using object id
obj_id = id(obj)
if obj_id in visited:
# Return a placeholder for circular references
return "<circular reference>"
# Mark this object as visited
visited.add(obj_id)
try:
if isinstance(obj, (list, tuple)):
return [self._serialize_object(item, visited) for item in obj]
elif isinstance(obj, dict):
return {key: self._serialize_object(value, visited) for key, value in obj.items()}
elif hasattr(obj, '__dict__'):
# Convert objects with __dict__ to dictionary
return self._serialize_object(obj.__dict__, visited)
else:
# Fallback: convert to string
return str(obj)
finally:
# Remove from visited set when done processing this branch
visited.discard(obj_id)
def _save_json_data(self, filepath, timestamp, nodes, test_results, analysis_issues,
local_node, router_stats, route_analysis):
"""
Saves all raw data to JSON file with session metadata.
"""
# Serialize local_node
local_node_data = None
if local_node:
if hasattr(local_node, '__dict__'):
local_node_data = self._serialize_object(local_node)
elif isinstance(local_node, dict):
local_node_data = self._serialize_object(local_node)
else:
local_node_data = str(local_node)
# Build the JSON structure
data = {
"session": {
"timestamp": timestamp,
"generated_at": datetime.now().isoformat(),
"config": self._serialize_object(self.config)
},
"data": {
"nodes": self._serialize_object(nodes),
"test_results": self._serialize_object(test_results),
"analysis_issues": analysis_issues, # Already a list of strings
"router_stats": self._serialize_object(router_stats),
"route_analysis": self._serialize_object(route_analysis),
"local_node": local_node_data
}
}
# Write to file with pretty formatting
with open(filepath, 'w') as f:
json.dump(data, f, indent=2, default=str)
def _write_executive_summary(self, f, nodes, test_results, analysis_issues, local_node=None):
f.write("## 1. Executive Summary\n")
@@ -249,11 +350,14 @@ class NetworkReporter:
f.write("No routers found.\n\n")
return
# Get radius from first stat entry (default to 2000m if missing)
radius_m = router_stats[0].get('radius', 2000)
radius_km = radius_m / 1000.0
# Get cluster_radius and router_density_threshold from config
cluster_radius_m = self.config.get('cluster_radius', 3000)
router_density_m = self.config.get('thresholds', {}).get('router_density_threshold', 2000)
cluster_radius_km = cluster_radius_m / 1000.0
router_density_km = router_density_m / 1000.0
f.write(f"| Name | Role | Neighbors ({radius_km:.1f}km) | Routers ({radius_km:.1f}km) | ChUtil | Relayed | Status |\n")
f.write(f"| Name | Role | Neighbors ({cluster_radius_km:.1f}km) | Routers ({router_density_km:.1f}km) | ChUtil | Relayed | Status |\n")
f.write("|---|---|---|---|---|---|---|\n")
for s in router_stats:

167
report_generate.py Executable file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Report Generator Tool
Regenerates markdown reports from JSON data files saved by the LoRa Mesh Analyzer.
Usage:
python report_generate.py <json_file_path> [--output <output_path>]
Example:
python report_generate.py reports/report-20251128-145548.json
python report_generate.py reports/report-20251128-145548.json --output custom-report.md
"""
import json
import sys
import os
import argparse
from datetime import datetime
# Add mesh_monitor to path
sys.path.insert(0, os.path.dirname(__file__))
from mesh_monitor.reporter import NetworkReporter
from mesh_monitor.route_analyzer import RouteAnalyzer
def load_json_data(json_filepath):
"""
Load raw data from JSON file.
"""
if not os.path.exists(json_filepath):
print(f"Error: File not found: {json_filepath}")
sys.exit(1)
try:
with open(json_filepath, 'r') as f:
data = json.load(f)
return data
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON file: {e}")
sys.exit(1)
except Exception as e:
print(f"Error loading file: {e}")
sys.exit(1)
def generate_report_from_json(json_filepath, output_path=None):
"""
Regenerate markdown report from JSON data.
"""
print(f"Loading data from: {json_filepath}")
# Load the JSON data
full_data = load_json_data(json_filepath)
# Extract session and data
session = full_data.get('session', {})
data = full_data.get('data', {})
# Extract all the components
nodes = data.get('nodes', {})
test_results = data.get('test_results', [])
analysis_issues = data.get('analysis_issues', [])
router_stats = data.get('router_stats', [])
route_analysis = data.get('route_analysis', {})
local_node = data.get('local_node')
config = session.get('config', {})
print(f"Session timestamp: {session.get('timestamp', 'Unknown')}")
print(f"Nodes: {len(nodes)}")
print(f"Test results: {len(test_results)}")
print(f"Analysis issues: {len(analysis_issues)}")
# Create a custom reporter that generates the file at the specified location
if output_path:
# Use the directory and filename from output_path
report_dir = os.path.dirname(output_path) or "."
filename_base = os.path.basename(output_path).replace('.md', '')
else:
# Generate new report in reports/ with regenerated timestamp
report_dir = "reports"
filename_base = None
reporter = NetworkReporter(report_dir=report_dir, config=config)
# We need to temporarily override the filename generation if custom output is specified
if output_path:
# Monkey-patch the generate_report to use custom filename
original_generate = reporter.generate_report
def custom_generate(nodes, test_results, analysis_issues, local_node=None, router_stats=None):
# Temporarily change the method to use custom filename
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
custom_filename = os.path.basename(output_path)
filepath = os.path.join(report_dir, custom_filename)
from mesh_monitor.route_analyzer import RouteAnalyzer
route_analyzer = RouteAnalyzer(nodes)
route_analysis_local = route_analyzer.analyze_routes(test_results)
try:
with open(filepath, "w") as f:
f.write(f"# Meshtastic Network Report\\n")
f.write(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n")
f.write(f"**Regenerated from:** {os.path.basename(json_filepath)}\\n\\n")
reporter._write_executive_summary(f, nodes, test_results, analysis_issues, local_node)
reporter._write_network_health(f, analysis_issues)
if router_stats:
reporter._write_router_performance_table(f, router_stats)
reporter._write_route_analysis(f, route_analysis_local)
reporter._write_traceroute_results(f, test_results, nodes, local_node)
reporter._write_recommendations(f, analysis_issues, test_results)
print(f"✅ Report regenerated successfully: {filepath}")
return filepath
except Exception as e:
print(f"❌ Failed to generate report: {e}")
return None
reporter.generate_report = custom_generate
# Generate the report
result = reporter.generate_report(
nodes=nodes,
test_results=test_results,
analysis_issues=analysis_issues,
local_node=local_node,
router_stats=router_stats
)
return result
def main():
parser = argparse.ArgumentParser(
description='Regenerate markdown reports from JSON data files',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python report_generate.py reports/report-20251128-145548.json
python report_generate.py reports/report-20251128-145548.json --output custom-report.md
"""
)
parser.add_argument(
'json_file',
help='Path to the JSON data file'
)
parser.add_argument(
'--output', '-o',
help='Custom output path for the markdown report (optional)',
default=None
)
args = parser.parse_args()
# Generate the report
generate_report_from_json(args.json_file, args.output)
if __name__ == "__main__":
main()

266
test_report_refactoring.py Normal file
View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
Test script to verify the report generation refactoring.
Creates mock data and tests both JSON persistence and report regeneration.
"""
import sys
import os
import json
from datetime import datetime
# Add mesh_monitor to path
sys.path.insert(0, os.path.dirname(__file__))
from mesh_monitor.reporter import NetworkReporter
def create_mock_data():
"""Create mock data similar to what the monitor would generate."""
# Mock nodes
nodes = {
"!12345678": {
"user": {"id": "!12345678", "longName": "Test Router 1", "shortName": "TR1"},
"position": {"latitude": 59.4370, "longitude": 24.7536},
"deviceMetrics": {"channelUtilization": 15.5, "airUtilTx": 2.3}
},
"!87654321": {
"user": {"id": "!87654321", "longName": "Test Router 2", "shortName": "TR2"},
"position": {"latitude": 59.4380, "longitude": 24.7550},
"deviceMetrics": {"channelUtilization": 8.2, "airUtilTx": 1.1}
}
}
# Mock test results
test_results = [
{
"node_id": "!12345678",
"status": "success",
"rtt": 2.5,
"hops_to": 2,
"hops_back": 2,
"snr": 8.5,
"route": ["!local", "!relay1", "!12345678"]
},
{
"node_id": "!87654321",
"status": "timeout",
"rtt": None,
"hops_to": None,
"hops_back": None,
"snr": None,
"route": []
}
]
# Mock analysis issues
analysis_issues = [
"Topology: High Router Density! Best positioned seems to be Test Router 1",
"Config: Network Size exceeds recommendations"
]
# Mock router stats
router_stats = [
{
"name": "Test Router 1",
"role": "ROUTER",
"neighbors": 5,
"routers_nearby": 2,
"ch_util": 15.5,
"relay_count": 12,
"status": "Active",
"radius": 2000
},
{
"name": "Test Router 2",
"role": "ROUTER",
"neighbors": 3,
"routers_nearby": 1,
"ch_util": 8.2,
"relay_count": 5,
"status": "Active",
"radius": 2000
}
]
# Mock local node
local_node = {
"user": {"id": "!local", "longName": "Local Node", "shortName": "LN"},
"position": {"latitude": 59.4360, "longitude": 24.7520}
}
# Mock config
config = {
"log_level": "info",
"traceroute_timeout": 60,
"router_density_threshold": 2000,
"analysis_mode": "distance"
}
return nodes, test_results, analysis_issues, router_stats, local_node, config
def test_report_generation():
"""Test that reports are generated in the reports/ folder with JSON."""
print("=" * 60)
print("Testing Report Generation with JSON Persistence")
print("=" * 60)
# Create mock data
nodes, test_results, analysis_issues, router_stats, local_node, config = create_mock_data()
# Create reporter
reporter = NetworkReporter(report_dir="reports", config=config)
print("\n✅ NetworkReporter created successfully")
print(f" Report directory: reports/")
print(f" Config passed: Yes")
# Generate report
print("\n📝 Generating report...")
report_path = reporter.generate_report(
nodes=nodes,
test_results=test_results,
analysis_issues=analysis_issues,
local_node=local_node,
router_stats=router_stats
)
if report_path:
print(f"✅ Report generated: {report_path}")
# Check if markdown report exists
if os.path.exists(report_path):
print(f"✅ Markdown file exists: {report_path}")
# Get file size
size_kb = os.path.getsize(report_path) / 1024
print(f" File size: {size_kb:.2f} KB")
else:
print(f"❌ Markdown file NOT found: {report_path}")
return False
# Check if JSON file exists
json_path = report_path.replace('.md', '.json')
if os.path.exists(json_path):
print(f"✅ JSON file exists: {json_path}")
# Get file size
size_kb = os.path.getsize(json_path) / 1024
print(f" File size: {size_kb:.2f} KB")
# Verify JSON structure
print("\n🔍 Verifying JSON structure...")
with open(json_path, 'r') as f:
data = json.load(f)
# Check session metadata
if 'session' in data:
print("✅ Session metadata present")
session = data['session']
print(f" Timestamp: {session.get('timestamp', 'N/A')}")
print(f" Generated at: {session.get('generated_at', 'N/A')}")
print(f" Config keys: {len(session.get('config', {}))}")
else:
print("❌ Session metadata missing")
return False
# Check data section
if 'data' in data:
print("✅ Data section present")
data_section = data['data']
print(f" Nodes: {len(data_section.get('nodes', {}))}")
print(f" Test results: {len(data_section.get('test_results', []))}")
print(f" Analysis issues: {len(data_section.get('analysis_issues', []))}")
print(f" Router stats: {len(data_section.get('router_stats', []))}")
print(f" Local node: {'present' if data_section.get('local_node') else 'missing'}")
else:
print("❌ Data section missing")
return False
return json_path
else:
print(f"❌ JSON file NOT found: {json_path}")
return False
else:
print("❌ Report generation failed")
return False
def test_report_regeneration(json_path):
"""Test report regeneration from JSON file."""
print("\n" + "=" * 60)
print("Testing Report Regeneration from JSON")
print("=" * 60)
if not json_path or not os.path.exists(json_path):
print(f"❌ JSON file not found: {json_path}")
return False
# Import the report generator
from report_generate import generate_report_from_json
print(f"\n📁 Source JSON: {json_path}")
# Test regeneration with custom output
custom_output = "reports/test-regenerated-report.md"
print(f"🔄 Regenerating report to: {custom_output}")
result = generate_report_from_json(json_path, custom_output)
if result and os.path.exists(custom_output):
print(f"✅ Report regenerated successfully: {custom_output}")
# Compare sizes (should be similar)
original_md = json_path.replace('.json', '.md')
if os.path.exists(original_md):
orig_size = os.path.getsize(original_md)
regen_size = os.path.getsize(custom_output)
print(f" Original size: {orig_size / 1024:.2f} KB")
print(f" Regenerated size: {regen_size / 1024:.2f} KB")
# They should be roughly the same size (within 10%)
if abs(orig_size - regen_size) / orig_size < 0.1:
print("✅ Size comparison: PASS (within 10%)")
else:
print("⚠️ Size comparison: Different (this is OK if content differs)")
return True
else:
print(f"❌ Report regeneration failed")
return False
def main():
print("\n🧪 REPORT GENERATION REFACTORING - VERIFICATION TESTS\n")
# Test 1: Report generation with JSON persistence
json_path = test_report_generation()
if not json_path:
print("\n❌ FAILED: Report generation test")
sys.exit(1)
# Test 2: Report regeneration from JSON
success = test_report_regeneration(json_path)
if not success:
print("\n❌ FAILED: Report regeneration test")
sys.exit(1)
print("\n" + "=" * 60)
print("✅ ALL TESTS PASSED!")
print("=" * 60)
print("\nSummary:")
print(" ✓ Reports are generated in reports/ folder")
print(" ✓ JSON files are created alongside markdown reports")
print(" ✓ JSON contains all session metadata and raw data")
print(" ✓ report_generate.py successfully regenerates reports from JSON")
print("\nNext steps:")
print(" - Clean up test files if needed")
print(" - Test with real data from the monitor")
if __name__ == "__main__":
main()