Add GPS location configuration and diagnostics stream

- Introduced options for using GPS coordinates for repeater location fields in config.
- Implemented precision control for GPS coordinates.
- Added a new API endpoint for a Server-Sent Events stream of GPS diagnostics.
- Updated GPSService to handle new configuration options and fallback logic.
- Enhanced unit tests for GPS location handling.
This commit is contained in:
Rightup
2026-04-28 23:22:52 +01:00
parent 42b4bbd9e9
commit 76a9785218
7 changed files with 234 additions and 1 deletions
+10
View File
@@ -137,6 +137,16 @@ gps:
# The default 0,0 repeater location is treated as unset.
use_manual_location_until_fix: true
# Opt-in: use GPS coordinates for repeater-originated location fields
# (for example flood adverts). When disabled, repeater.latitude/longitude
# from config are always used.
use_gps_for_repeater_location: false
# Optional privacy/obfuscation control when GPS is used for repeater
# location. If set, coordinates are rounded to this many decimal places
# before use (0-8). Leave null for full reported precision.
repeater_location_precision_digits: null
# Source type:
# serial = read directly from an attached GPS module
# file = read NMEA lines from source_path, useful for gpsd/sidecar bridges
+2
View File
@@ -83,6 +83,8 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
config["gps"] = {
"enabled": False,
"use_manual_location_until_fix": True,
"use_gps_for_repeater_location": False,
"repeater_location_precision_digits": None,
"source": "serial",
"device": "/dev/serial0",
"baud_rate": 9600,
+68
View File
@@ -72,6 +72,13 @@ def _is_zero_coordinate(latitude: Optional[float], longitude: Optional[float]) -
return latitude == 0.0 and longitude == 0.0
def _normalize_precision_digits(value: Any) -> Optional[int]:
digits = _to_int(value)
if digits is None:
return None
return max(0, min(8, digits))
def _nmea_checksum(payload: str) -> int:
checksum = 0
for char in payload:
@@ -582,6 +589,12 @@ class GPSService:
self.use_manual_location_until_fix = bool(
gps_config.get("use_manual_location_until_fix", True)
)
self.use_gps_for_repeater_location = bool(
gps_config.get("use_gps_for_repeater_location", False)
)
self.repeater_location_precision_digits = _normalize_precision_digits(
gps_config.get("repeater_location_precision_digits")
)
self.source = str(gps_config.get("source", "serial")).lower()
self.device = gps_config.get("device", "/dev/serial0")
self.baud_rate = int(gps_config.get("baud_rate", 9600))
@@ -673,8 +686,60 @@ class GPSService:
"in_view_count": snapshot["satellites"].get("in_view_count"),
"snr": snapshot["satellites"].get("snr"),
},
"repeater_location": snapshot.get("repeater_location"),
}
def _apply_precision(self, value: Optional[float]) -> Optional[float]:
if value is None:
return None
if self.repeater_location_precision_digits is None:
return value
return round(value, self.repeater_location_precision_digits)
def _resolve_repeater_location(self, snapshot: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
fallback_lat = _to_float(self.repeater_config.get("latitude"))
fallback_lon = _to_float(self.repeater_config.get("longitude"))
fallback_location = {
"latitude": fallback_lat if fallback_lat is not None else 0.0,
"longitude": fallback_lon if fallback_lon is not None else 0.0,
"source": "config",
"gps_enabled_for_repeater_location": self.use_gps_for_repeater_location,
"precision_digits": self.repeater_location_precision_digits,
}
if not self.use_gps_for_repeater_location:
return fallback_location
snapshot = snapshot or {}
gps_fix_valid = bool(snapshot.get("status", {}).get("fix_valid"))
gps_position = snapshot.get("gps_position") or {}
gps_lat = _to_float(gps_position.get("latitude"))
gps_lon = _to_float(gps_position.get("longitude"))
if gps_fix_valid and _is_valid_latitude(gps_lat) and _is_valid_longitude(gps_lon):
return {
"latitude": self._apply_precision(gps_lat),
"longitude": self._apply_precision(gps_lon),
"source": "gps",
"gps_enabled_for_repeater_location": True,
"precision_digits": self.repeater_location_precision_digits,
}
return {
**fallback_location,
"source": "config_fallback_no_valid_gps_fix",
}
def get_repeater_location(self) -> Dict[str, Any]:
"""Return coordinates used for repeater-originated location fields.
This is intentionally opt-in for GPS-derived coordinates so deployments
can keep static site coordinates unless explicitly configured.
"""
snapshot = self.parser.snapshot()
self._apply_effective_position(snapshot)
return self._resolve_repeater_location(snapshot)
def get_snapshot(self) -> Dict[str, Any]:
snapshot = self.parser.snapshot()
self._apply_effective_position(snapshot)
@@ -690,8 +755,11 @@ class GPSService:
"read_timeout_seconds": self.read_timeout_seconds,
"poll_interval_seconds": self.poll_interval_seconds,
"stale_after_seconds": self.parser.stale_after_seconds,
"use_gps_for_repeater_location": self.use_gps_for_repeater_location,
"repeater_location_precision_digits": self.repeater_location_precision_digits,
},
"time_sync": self._get_time_sync_status(snapshot),
"repeater_location": self._resolve_repeater_location(snapshot),
}
)
if not self.enabled:
+14 -1
View File
@@ -1004,6 +1004,13 @@ class RepeaterDaemon:
node_name = repeater_config.get("node_name", "Repeater")
latitude = repeater_config.get("latitude", 0.0)
longitude = repeater_config.get("longitude", 0.0)
location_source = "config"
if self.gps_service:
location = self.gps_service.get_repeater_location()
latitude = location.get("latitude", latitude)
longitude = location.get("longitude", longitude)
location_source = str(location.get("source", location_source))
flags = ADVERT_FLAG_IS_REPEATER | ADVERT_FLAG_HAS_NAME
@@ -1026,7 +1033,13 @@ class RepeaterDaemon:
self.repeater_handler.mark_seen(packet)
logger.debug("Marked own advert as seen in duplicate cache")
logger.info(f"Sent flood advert '{node_name}' at ({latitude: .6f}, {longitude: .6f})")
logger.info(
"Sent flood advert '%s' at (% .6f, % .6f) source=%s",
node_name,
latitude,
longitude,
location_source,
)
return True
except Exception as e:
+50
View File
@@ -42,6 +42,7 @@ logger = logging.getLogger("HTTPServer")
# System
# GET /api/stats - Get system statistics
# GET /api/gps - Get local GPS diagnostics and parsed NMEA attributes
# GET /api/gps_stream - GPS diagnostics SSE stream
# GET /api/logs - Get system logs
# GET /api/hardware_stats - Get hardware statistics
# GET /api/hardware_processes - Get process information
@@ -700,6 +701,55 @@ class APIEndpoints:
logger.error(f"Error serving GPS diagnostics: {e}", exc_info=True)
return self._error(e)
@cherrypy.expose
def gps_stream(self):
"""Server-Sent Events stream for GPS diagnostics snapshots."""
cherrypy.response.headers["Content-Type"] = "text/event-stream"
cherrypy.response.headers["Cache-Control"] = "no-cache"
cherrypy.response.headers["Connection"] = "keep-alive"
def generate():
last_snapshot_json: Optional[str] = None
last_keepalive = time.time()
try:
yield (
f"data: {json.dumps({'type': 'connected', 'message': 'Connected to GPS stream'})}"
"\n\n"
)
while True:
response = self.gps()
if response.get("success"):
snapshot = response.get("data")
snapshot_json = json.dumps(snapshot, sort_keys=True, default=str)
if snapshot_json != last_snapshot_json:
yield (
f"data: {json.dumps({'type': 'snapshot', 'data': snapshot})}"
"\n\n"
)
last_snapshot_json = snapshot_json
last_keepalive = time.time()
elif (time.time() - last_keepalive) >= 15:
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
last_keepalive = time.time()
else:
yield (
f"data: {json.dumps({'type': 'error', 'error': response.get('error', 'GPS stream error')})}"
"\n\n"
)
time.sleep(1.0)
except GeneratorExit:
logger.debug("GPS SSE stream closed by client")
except Exception as exc:
logger.error(f"GPS SSE stream error: {exc}", exc_info=True)
return generate()
gps_stream._cp_config = {"response.stream": True}
@cherrypy.expose
@cherrypy.tools.json_out()
def send_advert(self):
+13
View File
@@ -411,6 +411,19 @@ paths:
raw_attributes:
type: object
/gps_stream:
get:
tags: [GPS]
summary: GPS diagnostics SSE stream
description: Server-Sent Events stream of live GPS diagnostics snapshots.
responses:
'200':
description: SSE stream
content:
text/event-stream:
schema:
type: string
/send_advert:
post:
tags: [System]
+77
View File
@@ -346,3 +346,80 @@ def test_gps_service_reflects_runtime_manual_location_updates():
assert snapshot["gps_position"]["latitude"] == 42.83538333
assert snapshot["gps_position"]["longitude"] == -71.1076
assert snapshot["position_meta"]["source"] == "manual_config"
def test_repeater_location_uses_config_when_gps_opt_in_disabled():
service = GPSService(
{
"repeater": {
"latitude": 42.123456,
"longitude": -71.654321,
},
"gps": {
"enabled": True,
"use_gps_for_repeater_location": False,
},
}
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,")
)
location = service.get_repeater_location()
assert location["source"] == "config"
assert location["latitude"] == 42.123456
assert location["longitude"] == -71.654321
def test_repeater_location_uses_gps_with_optional_precision_rounding():
service = GPSService(
{
"repeater": {
"latitude": 42.123456,
"longitude": -71.654321,
},
"gps": {
"enabled": True,
"use_gps_for_repeater_location": True,
"repeater_location_precision_digits": 3,
},
}
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,")
)
location = service.get_repeater_location()
assert location["source"] == "gps"
assert location["latitude"] == 42.835
assert location["longitude"] == -71.108
assert location["precision_digits"] == 3
def test_repeater_location_falls_back_to_config_without_valid_gps_fix():
service = GPSService(
{
"repeater": {
"latitude": 42.123456,
"longitude": -71.654321,
},
"gps": {
"enabled": True,
"use_gps_for_repeater_location": True,
},
}
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
)
location = service.get_repeater_location()
assert location["source"] == "config_fallback_no_valid_gps_fix"
assert location["latitude"] == 42.123456
assert location["longitude"] == -71.654321