Refine CAD calibration logic for improved sensitivity testing and results reporting

This commit is contained in:
Lloyd
2025-11-04 16:10:26 +00:00
parent 570629320d
commit 3306492ee2

View File

@@ -58,38 +58,83 @@ class CADCalibrationEngine:
self.calibration_thread = None
def get_test_ranges(self, spreading_factor: int):
"""Get CAD test ranges based on spreading factor - comprehensive coverage"""
"""Get CAD test ranges"""
# Higher values = less sensitive, lower values = more sensitive
# Test from LESS sensitive to MORE sensitive to find the sweet spot
sf_ranges = {
7: (range(17, 26, 1), range(7, 15, 1)), # Full range coverage
8: (range(17, 26, 1), range(7, 15, 1)), # Full range coverage
9: (range(19, 28, 1), range(8, 16, 1)), # Full range coverage
10: (range(21, 30, 1), range(9, 17, 1)), # Full range coverage
11: (range(23, 32, 1), range(10, 18, 1)), # Full range coverage
12: (range(25, 34, 1), range(11, 19, 1)), # Full range coverage
7: (range(22, 30, 1), range(12, 20, 1)),
8: (range(22, 30, 1), range(12, 20, 1)),
9: (range(24, 32, 1), range(14, 22, 1)),
10: (range(26, 34, 1), range(16, 24, 1)),
11: (range(28, 36, 1), range(18, 26, 1)),
12: (range(30, 38, 1), range(20, 28, 1)),
}
return sf_ranges.get(spreading_factor, sf_ranges[8])
async def test_cad_config(self, radio, det_peak: int, det_min: int, samples: int = 8) -> Dict[str, Any]:
"""Test a single CAD configuration with multiple samples"""
async def test_cad_config(self, radio, det_peak: int, det_min: int, samples: int = 20) -> Dict[str, Any]:
"""Test CAD configuration with proper spacing and baseline measurement"""
detections = 0
baseline_detections = 0
for _ in range(samples):
# First, get baseline with very insensitive settings (should detect nothing)
baseline_samples = 5
for _ in range(baseline_samples):
try:
# Use very high thresholds that should detect nothing
baseline_result = await radio.perform_cad(det_peak=35, det_min=25, timeout=0.3)
if baseline_result:
baseline_detections += 1
except Exception:
pass
await asyncio.sleep(0.1) # 100ms between baseline samples
# Wait before actual test
await asyncio.sleep(0.5)
# Now test the actual configuration
for i in range(samples):
try:
result = await radio.perform_cad(det_peak=det_peak, det_min=det_min, timeout=0.3)
if result:
detections += 1
except Exception:
pass
await asyncio.sleep(0.01) # Reduced sleep time
# Variable delay to avoid sampling artifacts
delay = 0.05 + (i % 3) * 0.05 # 50ms, 100ms, 150ms rotation
await asyncio.sleep(delay)
# Calculate adjusted detection rate
baseline_rate = (baseline_detections / baseline_samples) * 100
detection_rate = (detections / samples) * 100
# Subtract baseline noise
adjusted_rate = max(0, detection_rate - baseline_rate)
return {
'det_peak': det_peak,
'det_min': det_min,
'samples': samples,
'samples': samples,
'detections': detections,
'detection_rate': (detections / samples) * 100,
'detection_rate': detection_rate,
'baseline_rate': baseline_rate,
'adjusted_rate': adjusted_rate, # This is the useful metric
'sensitivity_score': self._calculate_sensitivity_score(det_peak, det_min, adjusted_rate)
}
def _calculate_sensitivity_score(self, det_peak: int, det_min: int, adjusted_rate: float) -> float:
"""Calculate a sensitivity score - higher is better balance"""
# Ideal detection rate is around 10-30% for good sensitivity without false positives
ideal_rate = 20.0
rate_penalty = abs(adjusted_rate - ideal_rate) / ideal_rate
# Prefer moderate sensitivity settings (not too extreme)
sensitivity_penalty = (abs(det_peak - 25) + abs(det_min - 15)) / 20.0
# Lower penalty = higher score
score = max(0, 100 - (rate_penalty * 50) - (sensitivity_penalty * 20))
return score
def broadcast_to_clients(self, data):
"""Send data to all connected SSE clients"""
# Store the message for clients to pick up
@@ -217,15 +262,29 @@ class CADCalibrationEngine:
time.sleep(delay_ms / 1000.0)
if self.running:
# Find best result
# Find best result based on sensitivity score (not just detection rate)
best_result = None
recommended_result = None
if self.results:
best_result = max(self.results.values(), key=lambda x: x['detection_rate'])
# Find result with highest sensitivity score (best balance)
best_result = max(self.results.values(), key=lambda x: x.get('sensitivity_score', 0))
# Also find result with ideal adjusted detection rate (10-30%)
ideal_results = [r for r in self.results.values() if 10 <= r.get('adjusted_rate', 0) <= 30]
if ideal_results:
# Among ideal results, pick the one with best sensitivity score
recommended_result = max(ideal_results, key=lambda x: x.get('sensitivity_score', 0))
else:
recommended_result = best_result
self.broadcast_to_clients({
"type": "completed",
"message": "Calibration completed",
"results": {"best": best_result} if best_result else None
"results": {
"best": best_result,
"recommended": recommended_result,
"total_tests": len(self.results)
} if best_result else None
})
else:
self.broadcast_to_clients({"type": "status", "message": "Calibration stopped"})