feat: add GPS diagnostics web UI

This commit is contained in:
Mitchell Moss
2026-04-23 18:54:19 -04:00
parent 37cd137bbb
commit 18300cbf42
11 changed files with 2512 additions and 1 deletions

View File

@@ -126,6 +126,39 @@ repeater:
# Controls how long users stay logged in before needing to re-authenticate
jwt_expiry_minutes: 60
# Local GPS diagnostics. When enabled, the daemon reads NMEA sentences from the
# configured source and exposes parsed attributes at /api/gps and /gps.
gps:
enabled: false
# Use repeater.latitude/repeater.longitude as the displayed receiver location
# until the GPS has a valid non-stale fix. This prevents early no-fix GPS
# estimates from moving the diagnostics globe/map away from the configured site.
# The default 0,0 repeater location is treated as unset.
use_manual_location_until_fix: true
# Source type:
# serial = read directly from an attached GPS module
# file = read NMEA lines from source_path, useful for gpsd/sidecar bridges
source: serial
# Serial source settings
device: "/dev/serial0"
baud_rate: 9600
read_timeout_seconds: 1.0
reconnect_interval_seconds: 5.0
# File source settings. The file can contain raw NMEA lines or JSON with a
# "sentences" list / "last_sentence" field.
source_path: "/var/lib/pymc_repeater/gps_nmea.txt"
poll_interval_seconds: 2.0
# Diagnostics behavior
stale_after_seconds: 10.0
retain_sentences: 25
validate_checksum: true
require_checksum: false
# Mesh Network Configuration
mesh:
# Unscoped flood policy - controls whether the repeater allows or denies unscoped flooding

View File

@@ -36,6 +36,7 @@ dependencies = [
"paho-mqtt>=1.6.0",
"cherrypy-cors==1.7.0",
"psutil>=5.9.0",
"pyserial>=3.5",
"pyjwt>=2.8.0",
"ws4py>=0.6.0",
]

View File

@@ -79,6 +79,21 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"cert_store_dir": "/etc/pymc_repeater/glass",
}
if "gps" not in config:
config["gps"] = {
"enabled": False,
"use_manual_location_until_fix": True,
"source": "serial",
"device": "/dev/serial0",
"baud_rate": 9600,
"read_timeout_seconds": 1.0,
"reconnect_interval_seconds": 5.0,
"stale_after_seconds": 10.0,
"retain_sentences": 25,
"validate_checksum": True,
"require_checksum": False,
}
# Ensure repeater.security exists with defaults for upgrades from older configs
if "repeater" not in config:
config["repeater"] = {}

View File

@@ -1,5 +1,6 @@
from .glass_handler import GlassHandler
from .gps_service import GPSService
from .rrdtool_handler import RRDToolHandler
from .sqlite_handler import SQLiteHandler
from .storage_collector import StorageCollector
__all__ = ["SQLiteHandler", "RRDToolHandler", "StorageCollector", "GlassHandler"]
__all__ = ["SQLiteHandler", "RRDToolHandler", "StorageCollector", "GlassHandler", "GPSService"]

View File

@@ -0,0 +1,785 @@
"""
GPS/NMEA acquisition and diagnostics support.
The service intentionally keeps the NMEA parser local and dependency-light. It
accepts raw NMEA sentences from a serial receiver or from a file source used by
external bridge processes, then exposes a JSON-serializable snapshot for the
HTTP API and diagnostics UI.
"""
from __future__ import annotations
import json
import logging
import math
import threading
import time
from collections import Counter, deque
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
logger = logging.getLogger("GPSService")
FIX_QUALITY_LABELS = {
0: "no fix",
1: "GPS",
2: "DGPS",
4: "RTK fixed",
5: "RTK float",
6: "estimated",
7: "manual",
8: "simulation",
}
GSA_FIX_TYPE_LABELS = {
1: "no fix",
2: "2D fix",
3: "3D fix",
}
def _to_float(value: Any) -> Optional[float]:
if value in (None, ""):
return None
try:
result = float(value)
except (TypeError, ValueError):
return None
return result if math.isfinite(result) else None
def _to_int(value: Any) -> Optional[int]:
if value in (None, ""):
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _is_valid_latitude(value: Optional[float]) -> bool:
return value is not None and -90.0 <= value <= 90.0
def _is_valid_longitude(value: Optional[float]) -> bool:
return value is not None and -180.0 <= value <= 180.0
def _is_zero_coordinate(latitude: Optional[float], longitude: Optional[float]) -> bool:
return latitude == 0.0 and longitude == 0.0
def _nmea_checksum(payload: str) -> int:
checksum = 0
for char in payload:
checksum ^= ord(char)
return checksum
def _parse_lat_lon(value: str, hemisphere: str) -> Optional[float]:
if not value or not hemisphere:
return None
dot = value.find(".")
if dot < 0:
dot = len(value)
# Latitude is ddmm.mmmm, longitude is dddmm.mmmm.
degree_digits = dot - 2
if degree_digits <= 0:
return None
try:
degrees = float(value[:degree_digits])
minutes = float(value[degree_digits:])
except ValueError:
return None
decimal = degrees + minutes / 60.0
if hemisphere.upper() in ("S", "W"):
decimal *= -1
return round(decimal, 8)
def _format_time(value: str) -> Optional[str]:
if not value or len(value) < 6:
return None
try:
hour = int(value[0:2])
minute = int(value[2:4])
second_float = float(value[4:])
except ValueError:
return None
second = int(second_float)
microsecond = int(round((second_float - second) * 1_000_000))
if microsecond >= 1_000_000:
second += 1
microsecond -= 1_000_000
try:
return f"{hour:02d}:{minute:02d}:{second:02d}.{microsecond:06d}Z"
except ValueError:
return None
def _format_date(value: str) -> Optional[str]:
if not value or len(value) != 6:
return None
try:
day = int(value[0:2])
month = int(value[2:4])
year_2 = int(value[4:6])
except ValueError:
return None
# NMEA RMC uses two-digit years. GPS modules in this project are modern,
# but keep 1980-2079 rollover behavior for old sample data.
year = 2000 + year_2 if year_2 < 80 else 1900 + year_2
try:
return datetime(year, month, day, tzinfo=timezone.utc).date().isoformat()
except ValueError:
return None
def _combine_datetime_utc(date_value: Optional[str], time_value: Optional[str]) -> Optional[str]:
if not date_value or not time_value:
return None
try:
time_part = time_value.rstrip("Z")
return f"{date_value}T{time_part}+00:00"
except Exception:
return None
class NMEAParser:
"""Small NMEA parser focused on diagnostics fields used by GPS receivers."""
def __init__(
self,
*,
validate_checksum: bool = True,
require_checksum: bool = False,
retain_sentences: int = 25,
stale_after_seconds: float = 10.0,
):
self.validate_checksum = validate_checksum
self.require_checksum = require_checksum
self.retain_sentences = max(0, int(retain_sentences))
self.stale_after_seconds = max(1.0, float(stale_after_seconds))
self._lock = threading.RLock()
self._reset_unlocked()
def _reset_unlocked(self):
self.position: Dict[str, Any] = {
"latitude": None,
"longitude": None,
"altitude_m": None,
"geoid_separation_m": None,
}
self.motion: Dict[str, Any] = {
"speed_knots": None,
"speed_kmh": None,
"course_degrees": None,
"magnetic_variation_degrees": None,
}
self.accuracy: Dict[str, Any] = {
"hdop": None,
"pdop": None,
"vdop": None,
}
self.time_data: Dict[str, Any] = {
"utc_time": None,
"date": None,
"datetime_utc": None,
}
self.fix: Dict[str, Any] = {
"valid": False,
"status": None,
"quality": None,
"quality_label": FIX_QUALITY_LABELS[0],
"gsa_fix_type": None,
"gsa_fix_type_label": None,
}
self.satellites: Dict[str, Any] = {
"used_count": None,
"used_prns": [],
"in_view_count": None,
"in_view": [],
"snr": {
"min": None,
"max": None,
"avg": None,
},
}
self.nmea: Dict[str, Any] = {
"last_sentence": None,
"last_sentence_type": None,
"last_talker": None,
"seen_sentence_types": [],
"sentence_counters": {},
"valid_checksum_count": 0,
"invalid_checksum_count": 0,
"missing_checksum_count": 0,
"recent_sentences": [],
}
self.raw_attributes: Dict[str, Any] = {}
self.last_update = None
self.last_error = None
self._sentence_counters: Counter = Counter()
self._recent_sentences = deque(maxlen=self.retain_sentences)
self._unhandled_sentence_types = set()
def reset(self):
with self._lock:
self._reset_unlocked()
def ingest_many(self, lines: Iterable[str]):
for line in lines:
self.ingest_sentence(line)
def ingest_sentence(self, sentence: str) -> bool:
parsed = self._split_sentence(sentence)
with self._lock:
if parsed is None:
return False
raw, payload, checksum_valid, talker, sentence_type, fields = parsed
if checksum_valid is False:
self.nmea["invalid_checksum_count"] += 1
self.last_error = "NMEA checksum mismatch"
if self.validate_checksum:
return False
elif checksum_valid is True:
self.nmea["valid_checksum_count"] += 1
else:
self.nmea["missing_checksum_count"] += 1
if self.require_checksum:
self.last_error = "NMEA checksum missing"
return False
now = time.time()
self.last_update = now
self.last_error = None
self._sentence_counters[sentence_type] += 1
self.nmea["last_sentence"] = raw
self.nmea["last_sentence_type"] = sentence_type
self.nmea["last_talker"] = talker
self.nmea["seen_sentence_types"] = sorted(self._sentence_counters.keys())
self.nmea["sentence_counters"] = dict(self._sentence_counters)
if self.retain_sentences:
self._recent_sentences.append(
{
"timestamp": datetime.fromtimestamp(now, timezone.utc).isoformat(),
"sentence_type": sentence_type,
"sentence": raw,
}
)
self.nmea["recent_sentences"] = list(self._recent_sentences)
handler = getattr(self, f"_parse_{sentence_type.lower()}", None)
if handler:
handler(fields)
else:
self._unhandled_sentence_types.add(sentence_type)
self.raw_attributes["unhandled_sentence_types"] = sorted(
self._unhandled_sentence_types
)
self._refresh_derived_unlocked()
return True
def _split_sentence(
self, sentence: str
) -> Optional[Tuple[str, str, Optional[bool], str, str, List[str]]]:
raw = (sentence or "").strip()
if not raw:
return None
if raw.startswith("$"):
raw_no_dollar = raw[1:]
else:
raw_no_dollar = raw
payload = raw_no_dollar
checksum_valid: Optional[bool] = None
if "*" in raw_no_dollar:
payload, supplied_checksum = raw_no_dollar.split("*", 1)
supplied_checksum = supplied_checksum[:2]
try:
expected = int(supplied_checksum, 16)
checksum_valid = _nmea_checksum(payload) == expected
except ValueError:
checksum_valid = False
fields = payload.split(",")
if not fields or len(fields[0]) < 3:
return None
sentence_id = fields[0].upper()
talker = sentence_id[:-3] or None
sentence_type = sentence_id[-3:]
return raw, payload, checksum_valid, talker, sentence_type, fields
def _parse_rmc(self, fields: List[str]):
# $GPRMC,time,status,lat,N,lon,E,sog,cog,date,magvar,E/W,...
self.raw_attributes["RMC"] = {
"utc_time_raw": self._field(fields, 1),
"status": self._field(fields, 2),
"date_raw": self._field(fields, 9),
"mode": self._field(fields, 12),
"nav_status": self._field(fields, 13),
}
status = (self._field(fields, 2) or "").upper()
self.fix["status"] = "valid" if status == "A" else "invalid" if status == "V" else status
self.fix["valid"] = status == "A" or bool(self.fix.get("quality"))
latitude = _parse_lat_lon(self._field(fields, 3), self._field(fields, 4))
longitude = _parse_lat_lon(self._field(fields, 5), self._field(fields, 6))
if latitude is not None:
self.position["latitude"] = latitude
if longitude is not None:
self.position["longitude"] = longitude
speed_knots = _to_float(self._field(fields, 7))
if speed_knots is not None:
self.motion["speed_knots"] = speed_knots
self.motion["speed_kmh"] = round(speed_knots * 1.852, 3)
course = _to_float(self._field(fields, 8))
if course is not None:
self.motion["course_degrees"] = course
mag_var = _to_float(self._field(fields, 10))
mag_dir = (self._field(fields, 11) or "").upper()
if mag_var is not None:
self.motion["magnetic_variation_degrees"] = -mag_var if mag_dir == "W" else mag_var
utc_time = _format_time(self._field(fields, 1))
date = _format_date(self._field(fields, 9))
if utc_time:
self.time_data["utc_time"] = utc_time
if date:
self.time_data["date"] = date
self.time_data["datetime_utc"] = _combine_datetime_utc(
self.time_data.get("date"), self.time_data.get("utc_time")
)
def _parse_gga(self, fields: List[str]):
# $GPGGA,time,lat,N,lon,E,quality,num_sats,hdop,alt,M,geoid,M,...
quality = _to_int(self._field(fields, 6))
satellites_used = _to_int(self._field(fields, 7))
self.raw_attributes["GGA"] = {
"utc_time_raw": self._field(fields, 1),
"fix_quality_raw": self._field(fields, 6),
"dgps_age": self._field(fields, 13),
"dgps_station_id": self._field(fields, 14),
}
if quality is not None:
self.fix["quality"] = quality
self.fix["quality_label"] = FIX_QUALITY_LABELS.get(quality, f"quality {quality}")
self.fix["valid"] = quality > 0 or self.fix.get("status") == "valid"
latitude = _parse_lat_lon(self._field(fields, 2), self._field(fields, 3))
longitude = _parse_lat_lon(self._field(fields, 4), self._field(fields, 5))
if latitude is not None:
self.position["latitude"] = latitude
if longitude is not None:
self.position["longitude"] = longitude
if satellites_used is not None:
self.satellites["used_count"] = satellites_used
hdop = _to_float(self._field(fields, 8))
if hdop is not None:
self.accuracy["hdop"] = hdop
altitude = _to_float(self._field(fields, 9))
if altitude is not None:
self.position["altitude_m"] = altitude
geoid_sep = _to_float(self._field(fields, 11))
if geoid_sep is not None:
self.position["geoid_separation_m"] = geoid_sep
utc_time = _format_time(self._field(fields, 1))
if utc_time:
self.time_data["utc_time"] = utc_time
self.time_data["datetime_utc"] = _combine_datetime_utc(
self.time_data.get("date"), self.time_data.get("utc_time")
)
def _parse_gsa(self, fields: List[str]):
# $GPGSA,mode,fix_type,sv1..sv12,pdop,hdop,vdop
fix_type = _to_int(self._field(fields, 2))
used_prns = [value for value in fields[3:15] if value]
self.raw_attributes["GSA"] = {
"mode": self._field(fields, 1),
}
if fix_type is not None:
self.fix["gsa_fix_type"] = fix_type
self.fix["gsa_fix_type_label"] = GSA_FIX_TYPE_LABELS.get(fix_type, f"type {fix_type}")
self.fix["valid"] = fix_type > 1 or self.fix.get("valid", False)
self.satellites["used_prns"] = used_prns
if used_prns:
self.satellites["used_count"] = len(used_prns)
pdop = _to_float(self._field(fields, 15))
hdop = _to_float(self._field(fields, 16))
vdop = _to_float(self._field(fields, 17))
if pdop is not None:
self.accuracy["pdop"] = pdop
if hdop is not None:
self.accuracy["hdop"] = hdop
if vdop is not None:
self.accuracy["vdop"] = vdop
def _parse_gsv(self, fields: List[str]):
# $GPGSV,total_msgs,msg_num,sats_in_view,prn,elev,az,snr,...
total_messages = _to_int(self._field(fields, 1))
message_number = _to_int(self._field(fields, 2))
satellites_in_view = _to_int(self._field(fields, 3))
current = [] if message_number == 1 else list(self.satellites.get("in_view") or [])
for idx in range(4, len(fields), 4):
prn = self._field(fields, idx)
if not prn:
continue
satellite = {
"prn": prn,
"elevation_degrees": _to_int(self._field(fields, idx + 1)),
"azimuth_degrees": _to_int(self._field(fields, idx + 2)),
"snr_db": _to_float(self._field(fields, idx + 3)),
}
current.append(satellite)
# Deduplicate by PRN while preserving last value for each satellite.
by_prn = {sat["prn"]: sat for sat in current}
in_view = sorted(by_prn.values(), key=lambda item: item["prn"])
self.satellites["in_view"] = in_view
self.satellites["in_view_count"] = satellites_in_view
self.raw_attributes["GSV"] = {
"total_messages": total_messages,
"last_message_number": message_number,
}
def _parse_vtg(self, fields: List[str]):
# $GPVTG,cog,T,cog_magnetic,M,sog_knots,N,sog_kmh,K,...
course_true = _to_float(self._field(fields, 1))
course_magnetic = _to_float(self._field(fields, 3))
speed_knots = _to_float(self._field(fields, 5))
speed_kmh = _to_float(self._field(fields, 7))
if course_true is not None:
self.motion["course_degrees"] = course_true
if speed_knots is not None:
self.motion["speed_knots"] = speed_knots
if speed_kmh is not None:
self.motion["speed_kmh"] = speed_kmh
self.raw_attributes["VTG"] = {
"course_magnetic_degrees": course_magnetic,
"mode": self._field(fields, 9),
}
@staticmethod
def _field(fields: List[str], index: int) -> str:
return fields[index] if index < len(fields) else ""
def _normalize_raw_attributes_unlocked(self):
if self._unhandled_sentence_types:
self.raw_attributes["unhandled_sentence_types"] = sorted(self._unhandled_sentence_types)
def _refresh_derived_unlocked(self):
snrs = [
sat.get("snr_db")
for sat in self.satellites.get("in_view", [])
if sat.get("snr_db") is not None
]
if snrs:
self.satellites["snr"] = {
"min": min(snrs),
"max": max(snrs),
"avg": round(sum(snrs) / len(snrs), 3),
}
else:
self.satellites["snr"] = {"min": None, "max": None, "avg": None}
self._normalize_raw_attributes_unlocked()
def snapshot(self) -> Dict[str, Any]:
with self._lock:
now = time.time()
age = now - self.last_update if self.last_update else None
stale = age is None or age > self.stale_after_seconds
fix_valid = bool(self.fix.get("valid")) and not stale
if self.last_error:
state = "error"
elif age is None:
state = "no_data"
elif stale:
state = "stale"
elif fix_valid:
state = "valid_fix"
else:
state = "invalid_fix"
return {
"status": {
"state": state,
"fix_valid": fix_valid,
"stale": stale,
"age_seconds": round(age, 3) if age is not None else None,
"last_update": (
datetime.fromtimestamp(self.last_update, timezone.utc).isoformat()
if self.last_update
else None
),
"last_error": self.last_error,
},
"fix": deepcopy(self.fix),
"position": deepcopy(self.position),
"motion": deepcopy(self.motion),
"accuracy": deepcopy(self.accuracy),
"time": deepcopy(self.time_data),
"satellites": deepcopy(self.satellites),
"nmea": deepcopy(self.nmea),
"raw_attributes": deepcopy(self.raw_attributes),
}
class GPSService:
"""Runtime GPS acquisition service."""
def __init__(self, config: Dict[str, Any]):
gps_config = config.get("gps", {}) if isinstance(config, dict) else {}
repeater_config = config.get("repeater", {}) if isinstance(config, dict) else {}
self.config = gps_config
self.enabled = bool(gps_config.get("enabled", False))
self.use_manual_location_until_fix = bool(
gps_config.get("use_manual_location_until_fix", True)
)
self.source = str(gps_config.get("source", "serial")).lower()
self.device = gps_config.get("device", "/dev/serial0")
self.baud_rate = int(gps_config.get("baud_rate", 9600))
self.read_timeout_seconds = float(gps_config.get("read_timeout_seconds", 1.0))
self.reconnect_interval_seconds = float(gps_config.get("reconnect_interval_seconds", 5.0))
self.poll_interval_seconds = float(gps_config.get("poll_interval_seconds", 2.0))
self.source_path = gps_config.get("source_path") or gps_config.get("snapshot_path")
self.repeater_config = repeater_config
self.parser = NMEAParser(
validate_checksum=bool(gps_config.get("validate_checksum", True)),
require_checksum=bool(gps_config.get("require_checksum", False)),
retain_sentences=int(gps_config.get("retain_sentences", 25)),
stale_after_seconds=float(gps_config.get("stale_after_seconds", 10.0)),
)
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._running = False
self._last_source_error: Optional[str] = None
self._last_file_content: Optional[str] = None
def start(self):
if not self.enabled:
return
if self._thread and self._thread.is_alive():
return
target = self._run_file_loop if self.source == "file" else self._run_serial_loop
self._stop_event.clear()
self._thread = threading.Thread(target=target, name="gps-service", daemon=True)
self._thread.start()
self._running = True
logger.info("GPS service started using %s source", self.source)
def stop(self, timeout: float = 2.0):
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=timeout)
self._running = False
logger.info("GPS service stopped")
def ingest_sentence(self, sentence: str) -> bool:
return self.parser.ingest_sentence(sentence)
def get_summary(self) -> Dict[str, Any]:
snapshot = self.get_snapshot()
return {
"enabled": snapshot["enabled"],
"source": snapshot["source"],
"status": snapshot["status"],
"fix": {
"valid": snapshot["fix"].get("valid"),
"quality": snapshot["fix"].get("quality"),
"quality_label": snapshot["fix"].get("quality_label"),
},
"position": snapshot["position"],
"position_meta": snapshot.get("position_meta"),
"gps_position": snapshot.get("gps_position"),
"manual_position": snapshot.get("manual_position"),
"satellites": {
"used_count": snapshot["satellites"].get("used_count"),
"in_view_count": snapshot["satellites"].get("in_view_count"),
"snr": snapshot["satellites"].get("snr"),
},
}
def get_snapshot(self) -> Dict[str, Any]:
snapshot = self.parser.snapshot()
self._apply_effective_position(snapshot)
snapshot.update(
{
"enabled": self.enabled,
"running": self._running and bool(self._thread and self._thread.is_alive()),
"source": {
"type": self.source,
"device": self.device if self.source == "serial" else None,
"baud_rate": self.baud_rate if self.source == "serial" else None,
"source_path": self.source_path if self.source == "file" else None,
"read_timeout_seconds": self.read_timeout_seconds,
"poll_interval_seconds": self.poll_interval_seconds,
"stale_after_seconds": self.parser.stale_after_seconds,
},
}
)
if not self.enabled:
snapshot["status"]["state"] = "disabled"
snapshot["status"]["last_error"] = None
elif self._last_source_error:
snapshot["status"]["last_error"] = self._last_source_error
if snapshot["status"]["state"] in ("no_data", "stale"):
snapshot["status"]["state"] = "error"
return snapshot
@staticmethod
def _extract_manual_position(repeater_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
latitude = _to_float(repeater_config.get("latitude"))
longitude = _to_float(repeater_config.get("longitude"))
if not _is_valid_latitude(latitude) or not _is_valid_longitude(longitude):
return None
# The packaged default is 0,0. Treat it as unset so a fresh install does
# not show Null Island while the GPS is still searching for a fix.
if _is_zero_coordinate(latitude, longitude):
return None
return {
"latitude": latitude,
"longitude": longitude,
"altitude_m": None,
"geoid_separation_m": None,
"source": "manual_config",
}
def _apply_effective_position(self, snapshot: Dict[str, Any]):
gps_position = deepcopy(snapshot.get("position") or {})
manual_position = deepcopy(self._extract_manual_position(self.repeater_config))
gps_fix_valid = bool(snapshot.get("status", {}).get("fix_valid"))
use_manual = (
self.use_manual_location_until_fix
and manual_position is not None
and not gps_fix_valid
)
if use_manual:
effective_position = {
**gps_position,
"latitude": manual_position["latitude"],
"longitude": manual_position["longitude"],
}
position_source = "manual_config"
position_source_label = "manual config until GPS fix"
else:
effective_position = gps_position
position_source = "gps"
position_source_label = "GPS fix" if gps_fix_valid else "GPS estimate"
snapshot["gps_position"] = gps_position
snapshot["manual_position"] = manual_position
snapshot["position"] = effective_position
snapshot["position_meta"] = {
"source": position_source,
"source_label": position_source_label,
"policy": "manual_until_gps_fix"
if self.use_manual_location_until_fix
else "gps_only",
"manual_config_available": manual_position is not None,
"gps_fix_valid": gps_fix_valid,
}
def _set_source_error(self, message: Optional[str]):
self._last_source_error = message
if message:
self.parser.last_error = message
logger.warning("GPS source error: %s", message)
else:
self.parser.last_error = None
def _run_serial_loop(self):
try:
import serial # type: ignore
except ImportError:
self._set_source_error("pyserial is not installed")
self._running = False
return
while not self._stop_event.is_set():
try:
with serial.Serial(
self.device,
self.baud_rate,
timeout=self.read_timeout_seconds,
) as port:
self._set_source_error(None)
while not self._stop_event.is_set():
line = port.readline()
if not line:
continue
sentence = line.decode("ascii", errors="ignore").strip()
if sentence:
self.ingest_sentence(sentence)
except Exception as exc:
self._set_source_error(f"{type(exc).__name__}: {exc}")
self._stop_event.wait(self.reconnect_interval_seconds)
self._running = False
def _run_file_loop(self):
if not self.source_path:
self._set_source_error("gps.source_path is required for file source")
self._running = False
return
path = Path(self.source_path)
while not self._stop_event.is_set():
try:
content = path.read_text(encoding="utf-8")
if content != self._last_file_content:
self._last_file_content = content
self._set_source_error(None)
lines = self._extract_file_sentences(content)
self.parser.ingest_many(lines)
except FileNotFoundError:
self._set_source_error(f"GPS source file not found: {path}")
except Exception as exc:
self._set_source_error(f"{type(exc).__name__}: {exc}")
self._stop_event.wait(self.poll_interval_seconds)
self._running = False
@staticmethod
def _extract_file_sentences(content: str) -> List[str]:
stripped = content.strip()
if not stripped:
return []
if stripped.startswith("{"):
payload = json.loads(stripped)
if isinstance(payload.get("sentences"), list):
return [str(item) for item in payload["sentences"]]
if payload.get("last_sentence"):
return [str(payload["last_sentence"])]
if payload.get("nmea"):
nmea = payload["nmea"]
if isinstance(nmea, dict) and nmea.get("last_sentence"):
return [str(nmea["last_sentence"])]
if isinstance(nmea, list):
return [str(item) for item in nmea]
return []
return [line.strip() for line in stripped.splitlines() if line.strip()]

View File

@@ -11,6 +11,7 @@ from repeater.companion.utils import validate_companion_node_name, normalize_com
from repeater.config import get_radio_for_board, load_config, save_config
from repeater.config_manager import ConfigManager
from repeater.data_acquisition.glass_handler import GlassHandler
from repeater.data_acquisition.gps_service import GPSService
from repeater.engine import RepeaterHandler
from repeater.handler_helpers import (
AdvertHelper,
@@ -49,6 +50,7 @@ class RepeaterDaemon:
self.path_helper = None
self.protocol_request_helper = None
self.glass_handler = None
self.gps_service = None
self.acl = None
self.router = None
self.companion_bridges: dict[int, object] = {}
@@ -262,6 +264,13 @@ class RepeaterDaemon:
)
logger.info("Config manager initialized")
self.gps_service = GPSService(self.config)
self.gps_service.start()
if self.config.get("gps", {}).get("enabled", False):
logger.info("GPS diagnostics initialized")
else:
logger.info("GPS diagnostics disabled")
# Initialize text message helper with per-identity ACLs
self.text_helper = TextHelper(
identity_manager=self.identity_manager,
@@ -916,6 +925,9 @@ class RepeaterDaemon:
except Exception:
stats["public_key"] = None
if self.gps_service:
stats["gps"] = self.gps_service.get_summary()
return stats
async def _get_companion_stats(self, stats_type: int) -> dict:
@@ -1076,6 +1088,13 @@ class RepeaterDaemon:
except Exception as e:
logger.warning(f"Error stopping Glass handler: {e}")
# Stop GPS diagnostics.
if self.gps_service:
try:
self.gps_service.stop()
except Exception as e:
logger.warning(f"Error stopping GPS diagnostics: {e}")
# Close storage publishers (MQTT/LetsMesh) to stop their worker threads.
try:
if self.repeater_handler and self.repeater_handler.storage:

View File

@@ -41,6 +41,7 @@ logger = logging.getLogger("HTTPServer")
# System
# GET /api/stats - Get system statistics
# GET /api/gps - Get local GPS diagnostics and parsed NMEA attributes
# GET /api/logs - Get system logs
# GET /api/hardware_stats - Get hardware statistics
# GET /api/hardware_processes - Get process information
@@ -628,6 +629,75 @@ class APIEndpoints:
logger.error(f"Error serving stats: {e}")
return {"error": str(e)}
@cherrypy.expose
@cherrypy.tools.json_out()
def gps(self):
"""Get full local GPS diagnostics and parsed NMEA attributes."""
try:
gps_service = getattr(self.daemon_instance, "gps_service", None)
if gps_service:
return self._success(gps_service.get_snapshot())
return self._success(
{
"enabled": False,
"running": False,
"source": self.config.get("gps", {}),
"status": {
"state": "disabled",
"fix_valid": False,
"stale": True,
"age_seconds": None,
"last_update": None,
"last_error": "GPS service is not initialized",
},
"fix": {
"valid": False,
"status": None,
"quality": None,
"quality_label": "no fix",
"gsa_fix_type": None,
"gsa_fix_type_label": None,
},
"position": {
"latitude": None,
"longitude": None,
"altitude_m": None,
"geoid_separation_m": None,
},
"motion": {
"speed_knots": None,
"speed_kmh": None,
"course_degrees": None,
"magnetic_variation_degrees": None,
},
"accuracy": {"hdop": None, "pdop": None, "vdop": None},
"time": {"utc_time": None, "date": None, "datetime_utc": None},
"satellites": {
"used_count": None,
"used_prns": [],
"in_view_count": None,
"in_view": [],
"snr": {"min": None, "max": None, "avg": None},
},
"nmea": {
"last_sentence": None,
"last_sentence_type": None,
"last_talker": None,
"seen_sentence_types": [],
"sentence_counters": {},
"valid_checksum_count": 0,
"invalid_checksum_count": 0,
"missing_checksum_count": 0,
"recent_sentences": [],
},
"raw_attributes": {},
}
)
except Exception as e:
logger.error(f"Error serving GPS diagnostics: {e}", exc_info=True)
return self._error(e)
@cherrypy.expose
@cherrypy.tools.json_out()
def send_advert(self):

1275
repeater/web/html/gps.html Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -152,6 +152,19 @@ class StatsApp:
logger.error(f"Error serving index.html: {e}")
raise cherrypy.HTTPError(500, "Internal server error")
@cherrypy.expose
def gps(self, **kwargs):
"""Serve the standalone GPS diagnostics page."""
gps_path = os.path.join(self.html_dir, "gps.html")
try:
with open(gps_path, "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
raise cherrypy.HTTPError(404, "GPS diagnostics page not found.")
except Exception as e:
logger.error(f"Error serving gps.html: {e}")
raise cherrypy.HTTPError(500, "Internal server error")
@cherrypy.expose
def default(self, *args, **kwargs):
"""Handle client-side routing - serve index.html for all non-API routes."""

View File

@@ -36,6 +36,8 @@ tags:
description: User authentication and API token management
- name: System
description: System statistics and control
- name: GPS
description: Local GPS receiver diagnostics
- name: Packets
description: Packet history and statistics
- name: Charts
@@ -318,6 +320,73 @@ paths:
type: string
example: "0.5.0"
/gps:
get:
tags: [GPS]
summary: Get local GPS diagnostics
description: Returns parsed NMEA fix, position, motion, accuracy, satellites, and raw sentence health.
security:
- BearerAuth: []
- ApiKeyAuth: []
responses:
'200':
description: GPS diagnostics snapshot
content:
application/json:
schema:
type: object
properties:
success:
type: boolean
data:
type: object
properties:
enabled:
type: boolean
running:
type: boolean
status:
type: object
fix:
type: object
position:
type: object
description: Effective receiver position used by the diagnostics UI
gps_position:
type: object
description: Raw position reported by the GPS receiver, even before a valid fix
manual_position:
type: object
nullable: true
description: Configured repeater latitude/longitude, when set to a non-zero coordinate
position_meta:
type: object
properties:
source:
type: string
enum: [gps, manual_config]
source_label:
type: string
policy:
type: string
enum: [manual_until_gps_fix, gps_only]
manual_config_available:
type: boolean
gps_fix_valid:
type: boolean
motion:
type: object
accuracy:
type: object
time:
type: object
satellites:
type: object
nmea:
type: object
raw_attributes:
type: object
/send_advert:
post:
tags: [System]

230
tests/test_gps_service.py Normal file
View File

@@ -0,0 +1,230 @@
import time
import importlib.util
from pathlib import Path
_MODULE_PATH = Path(__file__).resolve().parents[1] / "repeater" / "data_acquisition" / "gps_service.py"
_SPEC = importlib.util.spec_from_file_location("repeater_gps_service", _MODULE_PATH)
_MODULE = importlib.util.module_from_spec(_SPEC)
assert _SPEC and _SPEC.loader
_SPEC.loader.exec_module(_MODULE)
GPSService = _MODULE.GPSService
NMEAParser = _MODULE.NMEAParser
def _sentence(payload: str) -> str:
checksum = 0
for char in payload:
checksum ^= ord(char)
return f"${payload}*{checksum:02X}"
def test_nmea_parser_combines_rmc_gga_gsa_gsv_attributes():
parser = NMEAParser()
assert parser.ingest_sentence(
_sentence("GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W")
)
assert parser.ingest_sentence(
_sentence("GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,")
)
assert parser.ingest_sentence(
_sentence("GPGSA,A,3,04,05,09,12,24,25,29,,,,,,1.8,1.0,1.5")
)
assert parser.ingest_sentence(
_sentence("GPGSV,1,1,03,04,77,045,42,05,13,180,35,09,07,095,29")
)
snapshot = parser.snapshot()
assert snapshot["status"]["state"] == "valid_fix"
assert snapshot["position"]["latitude"] == 48.1173
assert snapshot["position"]["longitude"] == 11.51666667
assert snapshot["position"]["altitude_m"] == 545.4
assert snapshot["motion"]["speed_knots"] == 22.4
assert snapshot["motion"]["speed_kmh"] == 41.485
assert snapshot["motion"]["course_degrees"] == 84.4
assert snapshot["motion"]["magnetic_variation_degrees"] == -3.1
assert snapshot["accuracy"]["hdop"] == 1.0
assert snapshot["accuracy"]["pdop"] == 1.8
assert snapshot["accuracy"]["vdop"] == 1.5
assert snapshot["fix"]["quality"] == 1
assert snapshot["fix"]["quality_label"] == "GPS"
assert snapshot["fix"]["gsa_fix_type_label"] == "3D fix"
assert snapshot["satellites"]["used_count"] == 7
assert snapshot["satellites"]["in_view_count"] == 3
assert snapshot["satellites"]["snr"]["max"] == 42.0
assert snapshot["time"]["date"] == "1994-03-23"
assert snapshot["time"]["datetime_utc"] == "1994-03-23T12:35:19.000000+00:00"
assert set(snapshot["nmea"]["seen_sentence_types"]) == {"GGA", "GSA", "GSV", "RMC"}
def test_nmea_parser_rejects_bad_checksum_when_validation_enabled():
parser = NMEAParser(validate_checksum=True)
accepted = parser.ingest_sentence(
"$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*00"
)
snapshot = parser.snapshot()
assert accepted is False
assert snapshot["status"]["state"] == "error"
assert snapshot["nmea"]["invalid_checksum_count"] == 1
assert snapshot["status"]["last_error"] == "NMEA checksum mismatch"
def test_gps_service_file_source_reads_nmea_lines(tmp_path):
path = tmp_path / "gps_nmea.txt"
path.write_text(
"\n".join(
[
_sentence("GPRMC,010203,A,4250.123,N,07106.456,W,000.0,180.0,230426,,"),
_sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,"),
]
),
encoding="utf-8",
)
service = GPSService(
{
"gps": {
"enabled": True,
"source": "file",
"source_path": str(path),
"poll_interval_seconds": 0.05,
"stale_after_seconds": 5.0,
}
}
)
service.start()
try:
deadline = time.time() + 1.0
snapshot = service.get_snapshot()
while snapshot["status"]["state"] == "no_data" and time.time() < deadline:
time.sleep(0.05)
snapshot = service.get_snapshot()
finally:
service.stop()
assert snapshot["status"]["state"] == "valid_fix"
assert snapshot["position"]["latitude"] == 42.83538333
assert snapshot["position"]["longitude"] == -71.1076
assert snapshot["satellites"]["used_count"] == 5
def test_gps_service_uses_manual_location_until_gps_fix():
service = GPSService(
{
"repeater": {
"latitude": 42.123456,
"longitude": -71.654321,
},
"gps": {
"enabled": True,
},
}
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
)
snapshot = service.get_snapshot()
assert snapshot["status"]["state"] == "invalid_fix"
assert snapshot["position"]["latitude"] == 42.123456
assert snapshot["position"]["longitude"] == -71.654321
assert snapshot["gps_position"]["latitude"] == 42.83538333
assert snapshot["gps_position"]["longitude"] == -71.1076
assert snapshot["manual_position"]["latitude"] == 42.123456
assert snapshot["manual_position"]["longitude"] == -71.654321
assert snapshot["position_meta"]["source"] == "manual_config"
assert snapshot["position_meta"]["source_label"] == "manual config until GPS fix"
assert snapshot["position_meta"]["manual_config_available"] is True
assert snapshot["position_meta"]["gps_fix_valid"] is False
def test_gps_service_uses_gps_location_after_valid_fix():
service = GPSService(
{
"repeater": {
"latitude": 42.123456,
"longitude": -71.654321,
},
"gps": {
"enabled": True,
},
}
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,")
)
snapshot = service.get_snapshot()
assert snapshot["status"]["state"] == "valid_fix"
assert snapshot["position"]["latitude"] == 42.83538333
assert snapshot["position"]["longitude"] == -71.1076
assert snapshot["manual_position"]["latitude"] == 42.123456
assert snapshot["manual_position"]["longitude"] == -71.654321
assert snapshot["position_meta"]["source"] == "gps"
assert snapshot["position_meta"]["source_label"] == "GPS fix"
assert snapshot["position_meta"]["manual_config_available"] is True
assert snapshot["position_meta"]["gps_fix_valid"] is True
def test_gps_service_treats_zero_zero_manual_location_as_unset():
service = GPSService(
{
"repeater": {
"latitude": 0.0,
"longitude": 0.0,
},
"gps": {
"enabled": True,
},
}
)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
)
snapshot = service.get_snapshot()
assert snapshot["status"]["state"] == "invalid_fix"
assert snapshot["position"]["latitude"] == 42.83538333
assert snapshot["position"]["longitude"] == -71.1076
assert snapshot["manual_position"] is None
assert snapshot["position_meta"]["source"] == "gps"
assert snapshot["position_meta"]["source_label"] == "GPS estimate"
assert snapshot["position_meta"]["manual_config_available"] is False
assert snapshot["position_meta"]["gps_fix_valid"] is False
def test_gps_service_reflects_runtime_manual_location_updates():
config = {
"repeater": {
"latitude": 0.0,
"longitude": 0.0,
},
"gps": {
"enabled": True,
},
}
service = GPSService(config)
assert service.ingest_sentence(
_sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
)
assert service.get_snapshot()["position_meta"]["source"] == "gps"
config["repeater"]["latitude"] = 42.123456
config["repeater"]["longitude"] = -71.654321
snapshot = service.get_snapshot()
assert snapshot["position"]["latitude"] == 42.123456
assert snapshot["position"]["longitude"] == -71.654321
assert snapshot["gps_position"]["latitude"] == 42.83538333
assert snapshot["gps_position"]["longitude"] == -71.1076
assert snapshot["position_meta"]["source"] == "manual_config"