Make GPS location persistence opt-in and fuzzed

This commit is contained in:
Mitchell Moss
2026-04-29 10:45:53 -04:00
parent da8f83964a
commit d83cb61fe7
4 changed files with 52 additions and 17 deletions
+6 -4
View File
@@ -176,10 +176,12 @@ gps:
time_sync_min_offset_seconds: 1.0
time_sync_min_valid_year: 2020
# Feed a valid GPS fix back into repeater.latitude/repeater.longitude so
# pyMC Repeater adverts and pyMC Console location details follow the receiver.
# Updates are throttled to avoid rewriting config on every NMEA sentence.
update_repeater_location_from_fix: true
# Opt-in: feed a valid GPS fix back into repeater.latitude/repeater.longitude
# so pyMC Repeater adverts and pyMC Console location details follow the
# receiver. repeater_location_precision_digits is applied before saving so
# deployments can fuzz the stored/advertised location. Updates are throttled
# to avoid rewriting config on every NMEA sentence.
update_repeater_location_from_fix: false
location_update_interval_seconds: 600.0
# Mesh Network Configuration
+1 -1
View File
@@ -98,7 +98,7 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"time_sync_interval_seconds": 3600.0,
"time_sync_min_offset_seconds": 1.0,
"time_sync_min_valid_year": 2020,
"update_repeater_location_from_fix": True,
"update_repeater_location_from_fix": False,
"location_update_interval_seconds": 600.0,
}
+15 -10
View File
@@ -619,10 +619,7 @@ class GPSService:
self._clock_setter = clock_setter or _set_system_clock_from_datetime
self._time_provider = time_provider or time.time
self.location_update_enabled = bool(
gps_config.get(
"update_repeater_location_from_fix",
gps_config.get("use_gps_for_repeater_location", True),
)
gps_config.get("update_repeater_location_from_fix", False)
)
self.location_update_interval_seconds = max(
1.0, float(gps_config.get("location_update_interval_seconds", 600.0))
@@ -879,22 +876,30 @@ class GPSService:
if _is_zero_coordinate(latitude, longitude):
return
effective_latitude = self._apply_precision(latitude)
effective_longitude = self._apply_precision(longitude)
if not _is_valid_latitude(effective_latitude) or not _is_valid_longitude(
effective_longitude
):
return
self._last_location_update_monotonic = now_monotonic
payload = {
"latitude": latitude,
"longitude": longitude,
"latitude": effective_latitude,
"longitude": effective_longitude,
"altitude_m": _to_float(position.get("altitude_m")),
"fix": deepcopy(snapshot.get("fix") or {}),
"status": deepcopy(snapshot.get("status") or {}),
"time": deepcopy(snapshot.get("time") or {}),
"precision_digits": self.repeater_location_precision_digits,
}
try:
updated = bool(self._location_update_callback(payload))
except Exception as exc:
self._record_location_update_status(
state="error",
latitude=latitude,
longitude=longitude,
latitude=effective_latitude,
longitude=effective_longitude,
error=f"{type(exc).__name__}: {exc}",
)
logger.warning("GPS repeater location update failed: %s", exc)
@@ -902,8 +907,8 @@ class GPSService:
self._record_location_update_status(
state="updated" if updated else "skipped",
latitude=latitude,
longitude=longitude,
latitude=effective_latitude,
longitude=effective_longitude,
success=updated,
)
+30 -2
View File
@@ -464,6 +464,7 @@ def test_gps_service_updates_repeater_location_from_valid_fix(monkeypatch):
"gps": {
"enabled": True,
"time_sync_enabled": False,
"update_repeater_location_from_fix": True,
"location_update_interval_seconds": 600.0,
}
},
@@ -503,6 +504,7 @@ def test_gps_service_does_not_update_repeater_location_without_valid_fix():
"gps": {
"enabled": True,
"time_sync_enabled": False,
"update_repeater_location_from_fix": True,
}
},
location_update_callback=lambda payload: location_updates.append(payload) or True,
@@ -516,14 +518,13 @@ def test_gps_service_does_not_update_repeater_location_without_valid_fix():
assert service.get_snapshot()["location_update"]["state"] == "waiting_for_fix"
def test_gps_service_honors_legacy_use_gps_for_repeater_location_flag():
def test_gps_service_location_update_is_opt_in_by_default():
location_updates = []
service = GPSService(
{
"gps": {
"enabled": True,
"time_sync_enabled": False,
"use_gps_for_repeater_location": False,
}
},
location_update_callback=lambda payload: location_updates.append(payload) or True,
@@ -535,3 +536,30 @@ def test_gps_service_honors_legacy_use_gps_for_repeater_location_flag():
assert location_updates == []
assert service.get_snapshot()["location_update"]["state"] == "disabled"
def test_gps_service_fuzzes_persisted_repeater_location():
location_updates = []
service = GPSService(
{
"gps": {
"enabled": True,
"time_sync_enabled": False,
"update_repeater_location_from_fix": True,
"repeater_location_precision_digits": 3,
}
},
location_update_callback=lambda payload: location_updates.append(payload) or True,
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,")
)
assert len(location_updates) == 1
assert location_updates[0]["latitude"] == 42.835
assert location_updates[0]["longitude"] == -71.108
assert location_updates[0]["precision_digits"] == 3
snapshot = service.get_snapshot()
assert snapshot["location_update"]["last_latitude"] == 42.835
assert snapshot["location_update"]["last_longitude"] == -71.108