diff --git a/repeater/http_server.py b/repeater/http_server.py index 9615a2c..02de9ca 100644 --- a/repeater/http_server.py +++ b/repeater/http_server.py @@ -58,38 +58,83 @@ class CADCalibrationEngine: self.calibration_thread = None def get_test_ranges(self, spreading_factor: int): - """Get CAD test ranges based on spreading factor - comprehensive coverage""" + """Get CAD test ranges""" + # Higher values = less sensitive, lower values = more sensitive + # Test from LESS sensitive to MORE sensitive to find the sweet spot sf_ranges = { - 7: (range(17, 26, 1), range(7, 15, 1)), # Full range coverage - 8: (range(17, 26, 1), range(7, 15, 1)), # Full range coverage - 9: (range(19, 28, 1), range(8, 16, 1)), # Full range coverage - 10: (range(21, 30, 1), range(9, 17, 1)), # Full range coverage - 11: (range(23, 32, 1), range(10, 18, 1)), # Full range coverage - 12: (range(25, 34, 1), range(11, 19, 1)), # Full range coverage + 7: (range(22, 30, 1), range(12, 20, 1)), + 8: (range(22, 30, 1), range(12, 20, 1)), + 9: (range(24, 32, 1), range(14, 22, 1)), + 10: (range(26, 34, 1), range(16, 24, 1)), + 11: (range(28, 36, 1), range(18, 26, 1)), + 12: (range(30, 38, 1), range(20, 28, 1)), } return sf_ranges.get(spreading_factor, sf_ranges[8]) - async def test_cad_config(self, radio, det_peak: int, det_min: int, samples: int = 8) -> Dict[str, Any]: - """Test a single CAD configuration with multiple samples""" + async def test_cad_config(self, radio, det_peak: int, det_min: int, samples: int = 20) -> Dict[str, Any]: + """Test CAD configuration with proper spacing and baseline measurement""" detections = 0 + baseline_detections = 0 - for _ in range(samples): + # First, get baseline with very insensitive settings (should detect nothing) + baseline_samples = 5 + for _ in range(baseline_samples): + try: + # Use very high thresholds that should detect nothing + baseline_result = await radio.perform_cad(det_peak=35, det_min=25, timeout=0.3) + if baseline_result: + baseline_detections += 1 + except Exception: + pass + await asyncio.sleep(0.1) # 100ms between baseline samples + + # Wait before actual test + await asyncio.sleep(0.5) + + # Now test the actual configuration + for i in range(samples): try: result = await radio.perform_cad(det_peak=det_peak, det_min=det_min, timeout=0.3) if result: detections += 1 except Exception: pass - await asyncio.sleep(0.01) # Reduced sleep time + + # Variable delay to avoid sampling artifacts + delay = 0.05 + (i % 3) * 0.05 # 50ms, 100ms, 150ms rotation + await asyncio.sleep(delay) + + # Calculate adjusted detection rate + baseline_rate = (baseline_detections / baseline_samples) * 100 + detection_rate = (detections / samples) * 100 + + # Subtract baseline noise + adjusted_rate = max(0, detection_rate - baseline_rate) return { 'det_peak': det_peak, 'det_min': det_min, - 'samples': samples, + 'samples': samples, 'detections': detections, - 'detection_rate': (detections / samples) * 100, + 'detection_rate': detection_rate, + 'baseline_rate': baseline_rate, + 'adjusted_rate': adjusted_rate, # This is the useful metric + 'sensitivity_score': self._calculate_sensitivity_score(det_peak, det_min, adjusted_rate) } + def _calculate_sensitivity_score(self, det_peak: int, det_min: int, adjusted_rate: float) -> float: + """Calculate a sensitivity score - higher is better balance""" + # Ideal detection rate is around 10-30% for good sensitivity without false positives + ideal_rate = 20.0 + rate_penalty = abs(adjusted_rate - ideal_rate) / ideal_rate + + # Prefer moderate sensitivity settings (not too extreme) + sensitivity_penalty = (abs(det_peak - 25) + abs(det_min - 15)) / 20.0 + + # Lower penalty = higher score + score = max(0, 100 - (rate_penalty * 50) - (sensitivity_penalty * 20)) + return score + def broadcast_to_clients(self, data): """Send data to all connected SSE clients""" # Store the message for clients to pick up @@ -217,15 +262,29 @@ class CADCalibrationEngine: time.sleep(delay_ms / 1000.0) if self.running: - # Find best result + # Find best result based on sensitivity score (not just detection rate) best_result = None + recommended_result = None if self.results: - best_result = max(self.results.values(), key=lambda x: x['detection_rate']) + # Find result with highest sensitivity score (best balance) + best_result = max(self.results.values(), key=lambda x: x.get('sensitivity_score', 0)) + + # Also find result with ideal adjusted detection rate (10-30%) + ideal_results = [r for r in self.results.values() if 10 <= r.get('adjusted_rate', 0) <= 30] + if ideal_results: + # Among ideal results, pick the one with best sensitivity score + recommended_result = max(ideal_results, key=lambda x: x.get('sensitivity_score', 0)) + else: + recommended_result = best_result self.broadcast_to_clients({ "type": "completed", "message": "Calibration completed", - "results": {"best": best_result} if best_result else None + "results": { + "best": best_result, + "recommended": recommended_result, + "total_tests": len(self.results) + } if best_result else None }) else: self.broadcast_to_clients({"type": "status", "message": "Calibration stopped"})