diff --git a/config.yaml.example b/config.yaml.example
index 1060e7b..648acb1 100644
--- a/config.yaml.example
+++ b/config.yaml.example
@@ -126,6 +126,39 @@ repeater:
# Controls how long users stay logged in before needing to re-authenticate
jwt_expiry_minutes: 60
+# Local GPS diagnostics. When enabled, the daemon reads NMEA sentences from the
+# configured source and exposes parsed attributes at /api/gps and /gps.
+gps:
+ enabled: false
+
+ # Use repeater.latitude/repeater.longitude as the displayed receiver location
+ # until the GPS has a valid non-stale fix. This prevents early no-fix GPS
+ # estimates from moving the diagnostics globe/map away from the configured site.
+ # The default 0,0 repeater location is treated as unset.
+ use_manual_location_until_fix: true
+
+ # Source type:
+ # serial = read directly from an attached GPS module
+ # file = read NMEA lines from source_path, useful for gpsd/sidecar bridges
+ source: serial
+
+ # Serial source settings
+ device: "/dev/serial0"
+ baud_rate: 9600
+ read_timeout_seconds: 1.0
+ reconnect_interval_seconds: 5.0
+
+ # File source settings. The file can contain raw NMEA lines or JSON with a
+ # "sentences" list / "last_sentence" field.
+ source_path: "/var/lib/pymc_repeater/gps_nmea.txt"
+ poll_interval_seconds: 2.0
+
+ # Diagnostics behavior
+ stale_after_seconds: 10.0
+ retain_sentences: 25
+ validate_checksum: true
+ require_checksum: false
+
# Mesh Network Configuration
mesh:
# Unscoped flood policy - controls whether the repeater allows or denies unscoped flooding
diff --git a/pyproject.toml b/pyproject.toml
index 041c9fe..f768ef5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -36,6 +36,7 @@ dependencies = [
"paho-mqtt>=1.6.0",
"cherrypy-cors==1.7.0",
"psutil>=5.9.0",
+ "pyserial>=3.5",
"pyjwt>=2.8.0",
"ws4py>=0.6.0",
]
diff --git a/repeater/config.py b/repeater/config.py
index 3ce3670..b4c0b6e 100644
--- a/repeater/config.py
+++ b/repeater/config.py
@@ -79,6 +79,21 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"cert_store_dir": "/etc/pymc_repeater/glass",
}
+ if "gps" not in config:
+ config["gps"] = {
+ "enabled": False,
+ "use_manual_location_until_fix": True,
+ "source": "serial",
+ "device": "/dev/serial0",
+ "baud_rate": 9600,
+ "read_timeout_seconds": 1.0,
+ "reconnect_interval_seconds": 5.0,
+ "stale_after_seconds": 10.0,
+ "retain_sentences": 25,
+ "validate_checksum": True,
+ "require_checksum": False,
+ }
+
# Ensure repeater.security exists with defaults for upgrades from older configs
if "repeater" not in config:
config["repeater"] = {}
diff --git a/repeater/data_acquisition/__init__.py b/repeater/data_acquisition/__init__.py
index c013421..0c1f238 100644
--- a/repeater/data_acquisition/__init__.py
+++ b/repeater/data_acquisition/__init__.py
@@ -1,5 +1,6 @@
from .glass_handler import GlassHandler
+from .gps_service import GPSService
from .rrdtool_handler import RRDToolHandler
from .sqlite_handler import SQLiteHandler
from .storage_collector import StorageCollector
-__all__ = ["SQLiteHandler", "RRDToolHandler", "StorageCollector", "GlassHandler"]
+__all__ = ["SQLiteHandler", "RRDToolHandler", "StorageCollector", "GlassHandler", "GPSService"]
diff --git a/repeater/data_acquisition/gps_service.py b/repeater/data_acquisition/gps_service.py
new file mode 100644
index 0000000..f50d929
--- /dev/null
+++ b/repeater/data_acquisition/gps_service.py
@@ -0,0 +1,785 @@
+"""
+GPS/NMEA acquisition and diagnostics support.
+
+The service intentionally keeps the NMEA parser local and dependency-light. It
+accepts raw NMEA sentences from a serial receiver or from a file source used by
+external bridge processes, then exposes a JSON-serializable snapshot for the
+HTTP API and diagnostics UI.
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import math
+import threading
+import time
+from collections import Counter, deque
+from copy import deepcopy
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Tuple
+
+logger = logging.getLogger("GPSService")
+
+
+FIX_QUALITY_LABELS = {
+ 0: "no fix",
+ 1: "GPS",
+ 2: "DGPS",
+ 4: "RTK fixed",
+ 5: "RTK float",
+ 6: "estimated",
+ 7: "manual",
+ 8: "simulation",
+}
+
+GSA_FIX_TYPE_LABELS = {
+ 1: "no fix",
+ 2: "2D fix",
+ 3: "3D fix",
+}
+
+
+def _to_float(value: Any) -> Optional[float]:
+ if value in (None, ""):
+ return None
+ try:
+ result = float(value)
+ except (TypeError, ValueError):
+ return None
+ return result if math.isfinite(result) else None
+
+
+def _to_int(value: Any) -> Optional[int]:
+ if value in (None, ""):
+ return None
+ try:
+ return int(value)
+ except (TypeError, ValueError):
+ return None
+
+
+def _is_valid_latitude(value: Optional[float]) -> bool:
+ return value is not None and -90.0 <= value <= 90.0
+
+
+def _is_valid_longitude(value: Optional[float]) -> bool:
+ return value is not None and -180.0 <= value <= 180.0
+
+
+def _is_zero_coordinate(latitude: Optional[float], longitude: Optional[float]) -> bool:
+ return latitude == 0.0 and longitude == 0.0
+
+
+def _nmea_checksum(payload: str) -> int:
+ checksum = 0
+ for char in payload:
+ checksum ^= ord(char)
+ return checksum
+
+
+def _parse_lat_lon(value: str, hemisphere: str) -> Optional[float]:
+ if not value or not hemisphere:
+ return None
+
+ dot = value.find(".")
+ if dot < 0:
+ dot = len(value)
+
+ # Latitude is ddmm.mmmm, longitude is dddmm.mmmm.
+ degree_digits = dot - 2
+ if degree_digits <= 0:
+ return None
+
+ try:
+ degrees = float(value[:degree_digits])
+ minutes = float(value[degree_digits:])
+ except ValueError:
+ return None
+
+ decimal = degrees + minutes / 60.0
+ if hemisphere.upper() in ("S", "W"):
+ decimal *= -1
+ return round(decimal, 8)
+
+
+def _format_time(value: str) -> Optional[str]:
+ if not value or len(value) < 6:
+ return None
+ try:
+ hour = int(value[0:2])
+ minute = int(value[2:4])
+ second_float = float(value[4:])
+ except ValueError:
+ return None
+ second = int(second_float)
+ microsecond = int(round((second_float - second) * 1_000_000))
+ if microsecond >= 1_000_000:
+ second += 1
+ microsecond -= 1_000_000
+ try:
+ return f"{hour:02d}:{minute:02d}:{second:02d}.{microsecond:06d}Z"
+ except ValueError:
+ return None
+
+
+def _format_date(value: str) -> Optional[str]:
+ if not value or len(value) != 6:
+ return None
+ try:
+ day = int(value[0:2])
+ month = int(value[2:4])
+ year_2 = int(value[4:6])
+ except ValueError:
+ return None
+
+ # NMEA RMC uses two-digit years. GPS modules in this project are modern,
+ # but keep 1980-2079 rollover behavior for old sample data.
+ year = 2000 + year_2 if year_2 < 80 else 1900 + year_2
+ try:
+ return datetime(year, month, day, tzinfo=timezone.utc).date().isoformat()
+ except ValueError:
+ return None
+
+
+def _combine_datetime_utc(date_value: Optional[str], time_value: Optional[str]) -> Optional[str]:
+ if not date_value or not time_value:
+ return None
+ try:
+ time_part = time_value.rstrip("Z")
+ return f"{date_value}T{time_part}+00:00"
+ except Exception:
+ return None
+
+
+class NMEAParser:
+ """Small NMEA parser focused on diagnostics fields used by GPS receivers."""
+
+ def __init__(
+ self,
+ *,
+ validate_checksum: bool = True,
+ require_checksum: bool = False,
+ retain_sentences: int = 25,
+ stale_after_seconds: float = 10.0,
+ ):
+ self.validate_checksum = validate_checksum
+ self.require_checksum = require_checksum
+ self.retain_sentences = max(0, int(retain_sentences))
+ self.stale_after_seconds = max(1.0, float(stale_after_seconds))
+ self._lock = threading.RLock()
+ self._reset_unlocked()
+
+ def _reset_unlocked(self):
+ self.position: Dict[str, Any] = {
+ "latitude": None,
+ "longitude": None,
+ "altitude_m": None,
+ "geoid_separation_m": None,
+ }
+ self.motion: Dict[str, Any] = {
+ "speed_knots": None,
+ "speed_kmh": None,
+ "course_degrees": None,
+ "magnetic_variation_degrees": None,
+ }
+ self.accuracy: Dict[str, Any] = {
+ "hdop": None,
+ "pdop": None,
+ "vdop": None,
+ }
+ self.time_data: Dict[str, Any] = {
+ "utc_time": None,
+ "date": None,
+ "datetime_utc": None,
+ }
+ self.fix: Dict[str, Any] = {
+ "valid": False,
+ "status": None,
+ "quality": None,
+ "quality_label": FIX_QUALITY_LABELS[0],
+ "gsa_fix_type": None,
+ "gsa_fix_type_label": None,
+ }
+ self.satellites: Dict[str, Any] = {
+ "used_count": None,
+ "used_prns": [],
+ "in_view_count": None,
+ "in_view": [],
+ "snr": {
+ "min": None,
+ "max": None,
+ "avg": None,
+ },
+ }
+ self.nmea: Dict[str, Any] = {
+ "last_sentence": None,
+ "last_sentence_type": None,
+ "last_talker": None,
+ "seen_sentence_types": [],
+ "sentence_counters": {},
+ "valid_checksum_count": 0,
+ "invalid_checksum_count": 0,
+ "missing_checksum_count": 0,
+ "recent_sentences": [],
+ }
+ self.raw_attributes: Dict[str, Any] = {}
+ self.last_update = None
+ self.last_error = None
+ self._sentence_counters: Counter = Counter()
+ self._recent_sentences = deque(maxlen=self.retain_sentences)
+ self._unhandled_sentence_types = set()
+
+ def reset(self):
+ with self._lock:
+ self._reset_unlocked()
+
+ def ingest_many(self, lines: Iterable[str]):
+ for line in lines:
+ self.ingest_sentence(line)
+
+ def ingest_sentence(self, sentence: str) -> bool:
+ parsed = self._split_sentence(sentence)
+ with self._lock:
+ if parsed is None:
+ return False
+
+ raw, payload, checksum_valid, talker, sentence_type, fields = parsed
+ if checksum_valid is False:
+ self.nmea["invalid_checksum_count"] += 1
+ self.last_error = "NMEA checksum mismatch"
+ if self.validate_checksum:
+ return False
+ elif checksum_valid is True:
+ self.nmea["valid_checksum_count"] += 1
+ else:
+ self.nmea["missing_checksum_count"] += 1
+ if self.require_checksum:
+ self.last_error = "NMEA checksum missing"
+ return False
+
+ now = time.time()
+ self.last_update = now
+ self.last_error = None
+ self._sentence_counters[sentence_type] += 1
+ self.nmea["last_sentence"] = raw
+ self.nmea["last_sentence_type"] = sentence_type
+ self.nmea["last_talker"] = talker
+ self.nmea["seen_sentence_types"] = sorted(self._sentence_counters.keys())
+ self.nmea["sentence_counters"] = dict(self._sentence_counters)
+ if self.retain_sentences:
+ self._recent_sentences.append(
+ {
+ "timestamp": datetime.fromtimestamp(now, timezone.utc).isoformat(),
+ "sentence_type": sentence_type,
+ "sentence": raw,
+ }
+ )
+ self.nmea["recent_sentences"] = list(self._recent_sentences)
+
+ handler = getattr(self, f"_parse_{sentence_type.lower()}", None)
+ if handler:
+ handler(fields)
+ else:
+ self._unhandled_sentence_types.add(sentence_type)
+ self.raw_attributes["unhandled_sentence_types"] = sorted(
+ self._unhandled_sentence_types
+ )
+
+ self._refresh_derived_unlocked()
+ return True
+
+ def _split_sentence(
+ self, sentence: str
+ ) -> Optional[Tuple[str, str, Optional[bool], str, str, List[str]]]:
+ raw = (sentence or "").strip()
+ if not raw:
+ return None
+
+ if raw.startswith("$"):
+ raw_no_dollar = raw[1:]
+ else:
+ raw_no_dollar = raw
+
+ payload = raw_no_dollar
+ checksum_valid: Optional[bool] = None
+ if "*" in raw_no_dollar:
+ payload, supplied_checksum = raw_no_dollar.split("*", 1)
+ supplied_checksum = supplied_checksum[:2]
+ try:
+ expected = int(supplied_checksum, 16)
+ checksum_valid = _nmea_checksum(payload) == expected
+ except ValueError:
+ checksum_valid = False
+
+ fields = payload.split(",")
+ if not fields or len(fields[0]) < 3:
+ return None
+
+ sentence_id = fields[0].upper()
+ talker = sentence_id[:-3] or None
+ sentence_type = sentence_id[-3:]
+ return raw, payload, checksum_valid, talker, sentence_type, fields
+
+ def _parse_rmc(self, fields: List[str]):
+ # $GPRMC,time,status,lat,N,lon,E,sog,cog,date,magvar,E/W,...
+ self.raw_attributes["RMC"] = {
+ "utc_time_raw": self._field(fields, 1),
+ "status": self._field(fields, 2),
+ "date_raw": self._field(fields, 9),
+ "mode": self._field(fields, 12),
+ "nav_status": self._field(fields, 13),
+ }
+ status = (self._field(fields, 2) or "").upper()
+ self.fix["status"] = "valid" if status == "A" else "invalid" if status == "V" else status
+ self.fix["valid"] = status == "A" or bool(self.fix.get("quality"))
+
+ latitude = _parse_lat_lon(self._field(fields, 3), self._field(fields, 4))
+ longitude = _parse_lat_lon(self._field(fields, 5), self._field(fields, 6))
+ if latitude is not None:
+ self.position["latitude"] = latitude
+ if longitude is not None:
+ self.position["longitude"] = longitude
+
+ speed_knots = _to_float(self._field(fields, 7))
+ if speed_knots is not None:
+ self.motion["speed_knots"] = speed_knots
+ self.motion["speed_kmh"] = round(speed_knots * 1.852, 3)
+
+ course = _to_float(self._field(fields, 8))
+ if course is not None:
+ self.motion["course_degrees"] = course
+
+ mag_var = _to_float(self._field(fields, 10))
+ mag_dir = (self._field(fields, 11) or "").upper()
+ if mag_var is not None:
+ self.motion["magnetic_variation_degrees"] = -mag_var if mag_dir == "W" else mag_var
+
+ utc_time = _format_time(self._field(fields, 1))
+ date = _format_date(self._field(fields, 9))
+ if utc_time:
+ self.time_data["utc_time"] = utc_time
+ if date:
+ self.time_data["date"] = date
+ self.time_data["datetime_utc"] = _combine_datetime_utc(
+ self.time_data.get("date"), self.time_data.get("utc_time")
+ )
+
+ def _parse_gga(self, fields: List[str]):
+ # $GPGGA,time,lat,N,lon,E,quality,num_sats,hdop,alt,M,geoid,M,...
+ quality = _to_int(self._field(fields, 6))
+ satellites_used = _to_int(self._field(fields, 7))
+ self.raw_attributes["GGA"] = {
+ "utc_time_raw": self._field(fields, 1),
+ "fix_quality_raw": self._field(fields, 6),
+ "dgps_age": self._field(fields, 13),
+ "dgps_station_id": self._field(fields, 14),
+ }
+ if quality is not None:
+ self.fix["quality"] = quality
+ self.fix["quality_label"] = FIX_QUALITY_LABELS.get(quality, f"quality {quality}")
+ self.fix["valid"] = quality > 0 or self.fix.get("status") == "valid"
+
+ latitude = _parse_lat_lon(self._field(fields, 2), self._field(fields, 3))
+ longitude = _parse_lat_lon(self._field(fields, 4), self._field(fields, 5))
+ if latitude is not None:
+ self.position["latitude"] = latitude
+ if longitude is not None:
+ self.position["longitude"] = longitude
+
+ if satellites_used is not None:
+ self.satellites["used_count"] = satellites_used
+
+ hdop = _to_float(self._field(fields, 8))
+ if hdop is not None:
+ self.accuracy["hdop"] = hdop
+
+ altitude = _to_float(self._field(fields, 9))
+ if altitude is not None:
+ self.position["altitude_m"] = altitude
+
+ geoid_sep = _to_float(self._field(fields, 11))
+ if geoid_sep is not None:
+ self.position["geoid_separation_m"] = geoid_sep
+
+ utc_time = _format_time(self._field(fields, 1))
+ if utc_time:
+ self.time_data["utc_time"] = utc_time
+ self.time_data["datetime_utc"] = _combine_datetime_utc(
+ self.time_data.get("date"), self.time_data.get("utc_time")
+ )
+
+ def _parse_gsa(self, fields: List[str]):
+ # $GPGSA,mode,fix_type,sv1..sv12,pdop,hdop,vdop
+ fix_type = _to_int(self._field(fields, 2))
+ used_prns = [value for value in fields[3:15] if value]
+ self.raw_attributes["GSA"] = {
+ "mode": self._field(fields, 1),
+ }
+ if fix_type is not None:
+ self.fix["gsa_fix_type"] = fix_type
+ self.fix["gsa_fix_type_label"] = GSA_FIX_TYPE_LABELS.get(fix_type, f"type {fix_type}")
+ self.fix["valid"] = fix_type > 1 or self.fix.get("valid", False)
+
+ self.satellites["used_prns"] = used_prns
+ if used_prns:
+ self.satellites["used_count"] = len(used_prns)
+
+ pdop = _to_float(self._field(fields, 15))
+ hdop = _to_float(self._field(fields, 16))
+ vdop = _to_float(self._field(fields, 17))
+ if pdop is not None:
+ self.accuracy["pdop"] = pdop
+ if hdop is not None:
+ self.accuracy["hdop"] = hdop
+ if vdop is not None:
+ self.accuracy["vdop"] = vdop
+
+ def _parse_gsv(self, fields: List[str]):
+ # $GPGSV,total_msgs,msg_num,sats_in_view,prn,elev,az,snr,...
+ total_messages = _to_int(self._field(fields, 1))
+ message_number = _to_int(self._field(fields, 2))
+ satellites_in_view = _to_int(self._field(fields, 3))
+ current = [] if message_number == 1 else list(self.satellites.get("in_view") or [])
+
+ for idx in range(4, len(fields), 4):
+ prn = self._field(fields, idx)
+ if not prn:
+ continue
+ satellite = {
+ "prn": prn,
+ "elevation_degrees": _to_int(self._field(fields, idx + 1)),
+ "azimuth_degrees": _to_int(self._field(fields, idx + 2)),
+ "snr_db": _to_float(self._field(fields, idx + 3)),
+ }
+ current.append(satellite)
+
+ # Deduplicate by PRN while preserving last value for each satellite.
+ by_prn = {sat["prn"]: sat for sat in current}
+ in_view = sorted(by_prn.values(), key=lambda item: item["prn"])
+ self.satellites["in_view"] = in_view
+ self.satellites["in_view_count"] = satellites_in_view
+ self.raw_attributes["GSV"] = {
+ "total_messages": total_messages,
+ "last_message_number": message_number,
+ }
+
+ def _parse_vtg(self, fields: List[str]):
+ # $GPVTG,cog,T,cog_magnetic,M,sog_knots,N,sog_kmh,K,...
+ course_true = _to_float(self._field(fields, 1))
+ course_magnetic = _to_float(self._field(fields, 3))
+ speed_knots = _to_float(self._field(fields, 5))
+ speed_kmh = _to_float(self._field(fields, 7))
+ if course_true is not None:
+ self.motion["course_degrees"] = course_true
+ if speed_knots is not None:
+ self.motion["speed_knots"] = speed_knots
+ if speed_kmh is not None:
+ self.motion["speed_kmh"] = speed_kmh
+ self.raw_attributes["VTG"] = {
+ "course_magnetic_degrees": course_magnetic,
+ "mode": self._field(fields, 9),
+ }
+
+ @staticmethod
+ def _field(fields: List[str], index: int) -> str:
+ return fields[index] if index < len(fields) else ""
+
+ def _normalize_raw_attributes_unlocked(self):
+ if self._unhandled_sentence_types:
+ self.raw_attributes["unhandled_sentence_types"] = sorted(self._unhandled_sentence_types)
+
+ def _refresh_derived_unlocked(self):
+ snrs = [
+ sat.get("snr_db")
+ for sat in self.satellites.get("in_view", [])
+ if sat.get("snr_db") is not None
+ ]
+ if snrs:
+ self.satellites["snr"] = {
+ "min": min(snrs),
+ "max": max(snrs),
+ "avg": round(sum(snrs) / len(snrs), 3),
+ }
+ else:
+ self.satellites["snr"] = {"min": None, "max": None, "avg": None}
+ self._normalize_raw_attributes_unlocked()
+
+ def snapshot(self) -> Dict[str, Any]:
+ with self._lock:
+ now = time.time()
+ age = now - self.last_update if self.last_update else None
+ stale = age is None or age > self.stale_after_seconds
+ fix_valid = bool(self.fix.get("valid")) and not stale
+ if self.last_error:
+ state = "error"
+ elif age is None:
+ state = "no_data"
+ elif stale:
+ state = "stale"
+ elif fix_valid:
+ state = "valid_fix"
+ else:
+ state = "invalid_fix"
+
+ return {
+ "status": {
+ "state": state,
+ "fix_valid": fix_valid,
+ "stale": stale,
+ "age_seconds": round(age, 3) if age is not None else None,
+ "last_update": (
+ datetime.fromtimestamp(self.last_update, timezone.utc).isoformat()
+ if self.last_update
+ else None
+ ),
+ "last_error": self.last_error,
+ },
+ "fix": deepcopy(self.fix),
+ "position": deepcopy(self.position),
+ "motion": deepcopy(self.motion),
+ "accuracy": deepcopy(self.accuracy),
+ "time": deepcopy(self.time_data),
+ "satellites": deepcopy(self.satellites),
+ "nmea": deepcopy(self.nmea),
+ "raw_attributes": deepcopy(self.raw_attributes),
+ }
+
+
+class GPSService:
+ """Runtime GPS acquisition service."""
+
+ def __init__(self, config: Dict[str, Any]):
+ gps_config = config.get("gps", {}) if isinstance(config, dict) else {}
+ repeater_config = config.get("repeater", {}) if isinstance(config, dict) else {}
+ self.config = gps_config
+ self.enabled = bool(gps_config.get("enabled", False))
+ self.use_manual_location_until_fix = bool(
+ gps_config.get("use_manual_location_until_fix", True)
+ )
+ self.source = str(gps_config.get("source", "serial")).lower()
+ self.device = gps_config.get("device", "/dev/serial0")
+ self.baud_rate = int(gps_config.get("baud_rate", 9600))
+ self.read_timeout_seconds = float(gps_config.get("read_timeout_seconds", 1.0))
+ self.reconnect_interval_seconds = float(gps_config.get("reconnect_interval_seconds", 5.0))
+ self.poll_interval_seconds = float(gps_config.get("poll_interval_seconds", 2.0))
+ self.source_path = gps_config.get("source_path") or gps_config.get("snapshot_path")
+ self.repeater_config = repeater_config
+ self.parser = NMEAParser(
+ validate_checksum=bool(gps_config.get("validate_checksum", True)),
+ require_checksum=bool(gps_config.get("require_checksum", False)),
+ retain_sentences=int(gps_config.get("retain_sentences", 25)),
+ stale_after_seconds=float(gps_config.get("stale_after_seconds", 10.0)),
+ )
+ self._stop_event = threading.Event()
+ self._thread: Optional[threading.Thread] = None
+ self._running = False
+ self._last_source_error: Optional[str] = None
+ self._last_file_content: Optional[str] = None
+
+ def start(self):
+ if not self.enabled:
+ return
+ if self._thread and self._thread.is_alive():
+ return
+
+ target = self._run_file_loop if self.source == "file" else self._run_serial_loop
+ self._stop_event.clear()
+ self._thread = threading.Thread(target=target, name="gps-service", daemon=True)
+ self._thread.start()
+ self._running = True
+ logger.info("GPS service started using %s source", self.source)
+
+ def stop(self, timeout: float = 2.0):
+ self._stop_event.set()
+ if self._thread and self._thread.is_alive():
+ self._thread.join(timeout=timeout)
+ self._running = False
+ logger.info("GPS service stopped")
+
+ def ingest_sentence(self, sentence: str) -> bool:
+ return self.parser.ingest_sentence(sentence)
+
+ def get_summary(self) -> Dict[str, Any]:
+ snapshot = self.get_snapshot()
+ return {
+ "enabled": snapshot["enabled"],
+ "source": snapshot["source"],
+ "status": snapshot["status"],
+ "fix": {
+ "valid": snapshot["fix"].get("valid"),
+ "quality": snapshot["fix"].get("quality"),
+ "quality_label": snapshot["fix"].get("quality_label"),
+ },
+ "position": snapshot["position"],
+ "position_meta": snapshot.get("position_meta"),
+ "gps_position": snapshot.get("gps_position"),
+ "manual_position": snapshot.get("manual_position"),
+ "satellites": {
+ "used_count": snapshot["satellites"].get("used_count"),
+ "in_view_count": snapshot["satellites"].get("in_view_count"),
+ "snr": snapshot["satellites"].get("snr"),
+ },
+ }
+
+ def get_snapshot(self) -> Dict[str, Any]:
+ snapshot = self.parser.snapshot()
+ self._apply_effective_position(snapshot)
+ snapshot.update(
+ {
+ "enabled": self.enabled,
+ "running": self._running and bool(self._thread and self._thread.is_alive()),
+ "source": {
+ "type": self.source,
+ "device": self.device if self.source == "serial" else None,
+ "baud_rate": self.baud_rate if self.source == "serial" else None,
+ "source_path": self.source_path if self.source == "file" else None,
+ "read_timeout_seconds": self.read_timeout_seconds,
+ "poll_interval_seconds": self.poll_interval_seconds,
+ "stale_after_seconds": self.parser.stale_after_seconds,
+ },
+ }
+ )
+ if not self.enabled:
+ snapshot["status"]["state"] = "disabled"
+ snapshot["status"]["last_error"] = None
+ elif self._last_source_error:
+ snapshot["status"]["last_error"] = self._last_source_error
+ if snapshot["status"]["state"] in ("no_data", "stale"):
+ snapshot["status"]["state"] = "error"
+ return snapshot
+
+ @staticmethod
+ def _extract_manual_position(repeater_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+ latitude = _to_float(repeater_config.get("latitude"))
+ longitude = _to_float(repeater_config.get("longitude"))
+ if not _is_valid_latitude(latitude) or not _is_valid_longitude(longitude):
+ return None
+ # The packaged default is 0,0. Treat it as unset so a fresh install does
+ # not show Null Island while the GPS is still searching for a fix.
+ if _is_zero_coordinate(latitude, longitude):
+ return None
+ return {
+ "latitude": latitude,
+ "longitude": longitude,
+ "altitude_m": None,
+ "geoid_separation_m": None,
+ "source": "manual_config",
+ }
+
+ def _apply_effective_position(self, snapshot: Dict[str, Any]):
+ gps_position = deepcopy(snapshot.get("position") or {})
+ manual_position = deepcopy(self._extract_manual_position(self.repeater_config))
+ gps_fix_valid = bool(snapshot.get("status", {}).get("fix_valid"))
+ use_manual = (
+ self.use_manual_location_until_fix
+ and manual_position is not None
+ and not gps_fix_valid
+ )
+
+ if use_manual:
+ effective_position = {
+ **gps_position,
+ "latitude": manual_position["latitude"],
+ "longitude": manual_position["longitude"],
+ }
+ position_source = "manual_config"
+ position_source_label = "manual config until GPS fix"
+ else:
+ effective_position = gps_position
+ position_source = "gps"
+ position_source_label = "GPS fix" if gps_fix_valid else "GPS estimate"
+
+ snapshot["gps_position"] = gps_position
+ snapshot["manual_position"] = manual_position
+ snapshot["position"] = effective_position
+ snapshot["position_meta"] = {
+ "source": position_source,
+ "source_label": position_source_label,
+ "policy": "manual_until_gps_fix"
+ if self.use_manual_location_until_fix
+ else "gps_only",
+ "manual_config_available": manual_position is not None,
+ "gps_fix_valid": gps_fix_valid,
+ }
+
+ def _set_source_error(self, message: Optional[str]):
+ self._last_source_error = message
+ if message:
+ self.parser.last_error = message
+ logger.warning("GPS source error: %s", message)
+ else:
+ self.parser.last_error = None
+
+ def _run_serial_loop(self):
+ try:
+ import serial # type: ignore
+ except ImportError:
+ self._set_source_error("pyserial is not installed")
+ self._running = False
+ return
+
+ while not self._stop_event.is_set():
+ try:
+ with serial.Serial(
+ self.device,
+ self.baud_rate,
+ timeout=self.read_timeout_seconds,
+ ) as port:
+ self._set_source_error(None)
+ while not self._stop_event.is_set():
+ line = port.readline()
+ if not line:
+ continue
+ sentence = line.decode("ascii", errors="ignore").strip()
+ if sentence:
+ self.ingest_sentence(sentence)
+ except Exception as exc:
+ self._set_source_error(f"{type(exc).__name__}: {exc}")
+ self._stop_event.wait(self.reconnect_interval_seconds)
+
+ self._running = False
+
+ def _run_file_loop(self):
+ if not self.source_path:
+ self._set_source_error("gps.source_path is required for file source")
+ self._running = False
+ return
+
+ path = Path(self.source_path)
+ while not self._stop_event.is_set():
+ try:
+ content = path.read_text(encoding="utf-8")
+ if content != self._last_file_content:
+ self._last_file_content = content
+ self._set_source_error(None)
+ lines = self._extract_file_sentences(content)
+ self.parser.ingest_many(lines)
+ except FileNotFoundError:
+ self._set_source_error(f"GPS source file not found: {path}")
+ except Exception as exc:
+ self._set_source_error(f"{type(exc).__name__}: {exc}")
+ self._stop_event.wait(self.poll_interval_seconds)
+
+ self._running = False
+
+ @staticmethod
+ def _extract_file_sentences(content: str) -> List[str]:
+ stripped = content.strip()
+ if not stripped:
+ return []
+ if stripped.startswith("{"):
+ payload = json.loads(stripped)
+ if isinstance(payload.get("sentences"), list):
+ return [str(item) for item in payload["sentences"]]
+ if payload.get("last_sentence"):
+ return [str(payload["last_sentence"])]
+ if payload.get("nmea"):
+ nmea = payload["nmea"]
+ if isinstance(nmea, dict) and nmea.get("last_sentence"):
+ return [str(nmea["last_sentence"])]
+ if isinstance(nmea, list):
+ return [str(item) for item in nmea]
+ return []
+ return [line.strip() for line in stripped.splitlines() if line.strip()]
diff --git a/repeater/main.py b/repeater/main.py
index 2910b28..856c6da 100644
--- a/repeater/main.py
+++ b/repeater/main.py
@@ -11,6 +11,7 @@ from repeater.companion.utils import validate_companion_node_name, normalize_com
from repeater.config import get_radio_for_board, load_config, save_config
from repeater.config_manager import ConfigManager
from repeater.data_acquisition.glass_handler import GlassHandler
+from repeater.data_acquisition.gps_service import GPSService
from repeater.engine import RepeaterHandler
from repeater.handler_helpers import (
AdvertHelper,
@@ -49,6 +50,7 @@ class RepeaterDaemon:
self.path_helper = None
self.protocol_request_helper = None
self.glass_handler = None
+ self.gps_service = None
self.acl = None
self.router = None
self.companion_bridges: dict[int, object] = {}
@@ -262,6 +264,13 @@ class RepeaterDaemon:
)
logger.info("Config manager initialized")
+ self.gps_service = GPSService(self.config)
+ self.gps_service.start()
+ if self.config.get("gps", {}).get("enabled", False):
+ logger.info("GPS diagnostics initialized")
+ else:
+ logger.info("GPS diagnostics disabled")
+
# Initialize text message helper with per-identity ACLs
self.text_helper = TextHelper(
identity_manager=self.identity_manager,
@@ -916,6 +925,9 @@ class RepeaterDaemon:
except Exception:
stats["public_key"] = None
+ if self.gps_service:
+ stats["gps"] = self.gps_service.get_summary()
+
return stats
async def _get_companion_stats(self, stats_type: int) -> dict:
@@ -1076,6 +1088,13 @@ class RepeaterDaemon:
except Exception as e:
logger.warning(f"Error stopping Glass handler: {e}")
+ # Stop GPS diagnostics.
+ if self.gps_service:
+ try:
+ self.gps_service.stop()
+ except Exception as e:
+ logger.warning(f"Error stopping GPS diagnostics: {e}")
+
# Close storage publishers (MQTT/LetsMesh) to stop their worker threads.
try:
if self.repeater_handler and self.repeater_handler.storage:
diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py
index 4776038..386969f 100644
--- a/repeater/web/api_endpoints.py
+++ b/repeater/web/api_endpoints.py
@@ -41,6 +41,7 @@ logger = logging.getLogger("HTTPServer")
# System
# GET /api/stats - Get system statistics
+# GET /api/gps - Get local GPS diagnostics and parsed NMEA attributes
# GET /api/logs - Get system logs
# GET /api/hardware_stats - Get hardware statistics
# GET /api/hardware_processes - Get process information
@@ -628,6 +629,75 @@ class APIEndpoints:
logger.error(f"Error serving stats: {e}")
return {"error": str(e)}
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def gps(self):
+ """Get full local GPS diagnostics and parsed NMEA attributes."""
+ try:
+ gps_service = getattr(self.daemon_instance, "gps_service", None)
+ if gps_service:
+ return self._success(gps_service.get_snapshot())
+
+ return self._success(
+ {
+ "enabled": False,
+ "running": False,
+ "source": self.config.get("gps", {}),
+ "status": {
+ "state": "disabled",
+ "fix_valid": False,
+ "stale": True,
+ "age_seconds": None,
+ "last_update": None,
+ "last_error": "GPS service is not initialized",
+ },
+ "fix": {
+ "valid": False,
+ "status": None,
+ "quality": None,
+ "quality_label": "no fix",
+ "gsa_fix_type": None,
+ "gsa_fix_type_label": None,
+ },
+ "position": {
+ "latitude": None,
+ "longitude": None,
+ "altitude_m": None,
+ "geoid_separation_m": None,
+ },
+ "motion": {
+ "speed_knots": None,
+ "speed_kmh": None,
+ "course_degrees": None,
+ "magnetic_variation_degrees": None,
+ },
+ "accuracy": {"hdop": None, "pdop": None, "vdop": None},
+ "time": {"utc_time": None, "date": None, "datetime_utc": None},
+ "satellites": {
+ "used_count": None,
+ "used_prns": [],
+ "in_view_count": None,
+ "in_view": [],
+ "snr": {"min": None, "max": None, "avg": None},
+ },
+ "nmea": {
+ "last_sentence": None,
+ "last_sentence_type": None,
+ "last_talker": None,
+ "seen_sentence_types": [],
+ "sentence_counters": {},
+ "valid_checksum_count": 0,
+ "invalid_checksum_count": 0,
+ "missing_checksum_count": 0,
+ "recent_sentences": [],
+ },
+ "raw_attributes": {},
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error serving GPS diagnostics: {e}", exc_info=True)
+ return self._error(e)
+
@cherrypy.expose
@cherrypy.tools.json_out()
def send_advert(self):
diff --git a/repeater/web/html/gps.html b/repeater/web/html/gps.html
new file mode 100644
index 0000000..c9b8987
--- /dev/null
+++ b/repeater/web/html/gps.html
@@ -0,0 +1,1275 @@
+
+
+
+
+
+ pyMC GPS Diagnostics
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Fix State
+
Loading
+
Waiting for /api/gps
+
+
+
Coordinates
+
n/a
+
latitude, longitude
+
+
+
Satellites
+
n/a
+
used / in view
+
+
+
Freshness
+
n/a
+
last update unknown
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Satellites In View + SNR
+
+
+
+
Recent NMEA Sentences
+
+
+
+
+
+
+
+
+
+
+
diff --git a/repeater/web/http_server.py b/repeater/web/http_server.py
index 2067313..720f6f7 100644
--- a/repeater/web/http_server.py
+++ b/repeater/web/http_server.py
@@ -152,6 +152,19 @@ class StatsApp:
logger.error(f"Error serving index.html: {e}")
raise cherrypy.HTTPError(500, "Internal server error")
+ @cherrypy.expose
+ def gps(self, **kwargs):
+ """Serve the standalone GPS diagnostics page."""
+ gps_path = os.path.join(self.html_dir, "gps.html")
+ try:
+ with open(gps_path, "r", encoding="utf-8") as f:
+ return f.read()
+ except FileNotFoundError:
+ raise cherrypy.HTTPError(404, "GPS diagnostics page not found.")
+ except Exception as e:
+ logger.error(f"Error serving gps.html: {e}")
+ raise cherrypy.HTTPError(500, "Internal server error")
+
@cherrypy.expose
def default(self, *args, **kwargs):
"""Handle client-side routing - serve index.html for all non-API routes."""
diff --git a/repeater/web/openapi.yaml b/repeater/web/openapi.yaml
index a353b0f..4a8ec63 100644
--- a/repeater/web/openapi.yaml
+++ b/repeater/web/openapi.yaml
@@ -36,6 +36,8 @@ tags:
description: User authentication and API token management
- name: System
description: System statistics and control
+ - name: GPS
+ description: Local GPS receiver diagnostics
- name: Packets
description: Packet history and statistics
- name: Charts
@@ -318,6 +320,73 @@ paths:
type: string
example: "0.5.0"
+ /gps:
+ get:
+ tags: [GPS]
+ summary: Get local GPS diagnostics
+ description: Returns parsed NMEA fix, position, motion, accuracy, satellites, and raw sentence health.
+ security:
+ - BearerAuth: []
+ - ApiKeyAuth: []
+ responses:
+ '200':
+ description: GPS diagnostics snapshot
+ content:
+ application/json:
+ schema:
+ type: object
+ properties:
+ success:
+ type: boolean
+ data:
+ type: object
+ properties:
+ enabled:
+ type: boolean
+ running:
+ type: boolean
+ status:
+ type: object
+ fix:
+ type: object
+ position:
+ type: object
+ description: Effective receiver position used by the diagnostics UI
+ gps_position:
+ type: object
+ description: Raw position reported by the GPS receiver, even before a valid fix
+ manual_position:
+ type: object
+ nullable: true
+ description: Configured repeater latitude/longitude, when set to a non-zero coordinate
+ position_meta:
+ type: object
+ properties:
+ source:
+ type: string
+ enum: [gps, manual_config]
+ source_label:
+ type: string
+ policy:
+ type: string
+ enum: [manual_until_gps_fix, gps_only]
+ manual_config_available:
+ type: boolean
+ gps_fix_valid:
+ type: boolean
+ motion:
+ type: object
+ accuracy:
+ type: object
+ time:
+ type: object
+ satellites:
+ type: object
+ nmea:
+ type: object
+ raw_attributes:
+ type: object
+
/send_advert:
post:
tags: [System]
diff --git a/tests/test_gps_service.py b/tests/test_gps_service.py
new file mode 100644
index 0000000..99dd2c7
--- /dev/null
+++ b/tests/test_gps_service.py
@@ -0,0 +1,230 @@
+import time
+import importlib.util
+from pathlib import Path
+
+_MODULE_PATH = Path(__file__).resolve().parents[1] / "repeater" / "data_acquisition" / "gps_service.py"
+_SPEC = importlib.util.spec_from_file_location("repeater_gps_service", _MODULE_PATH)
+_MODULE = importlib.util.module_from_spec(_SPEC)
+assert _SPEC and _SPEC.loader
+_SPEC.loader.exec_module(_MODULE)
+GPSService = _MODULE.GPSService
+NMEAParser = _MODULE.NMEAParser
+
+
+def _sentence(payload: str) -> str:
+ checksum = 0
+ for char in payload:
+ checksum ^= ord(char)
+ return f"${payload}*{checksum:02X}"
+
+
+def test_nmea_parser_combines_rmc_gga_gsa_gsv_attributes():
+ parser = NMEAParser()
+
+ assert parser.ingest_sentence(
+ _sentence("GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W")
+ )
+ assert parser.ingest_sentence(
+ _sentence("GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,")
+ )
+ assert parser.ingest_sentence(
+ _sentence("GPGSA,A,3,04,05,09,12,24,25,29,,,,,,1.8,1.0,1.5")
+ )
+ assert parser.ingest_sentence(
+ _sentence("GPGSV,1,1,03,04,77,045,42,05,13,180,35,09,07,095,29")
+ )
+
+ snapshot = parser.snapshot()
+
+ assert snapshot["status"]["state"] == "valid_fix"
+ assert snapshot["position"]["latitude"] == 48.1173
+ assert snapshot["position"]["longitude"] == 11.51666667
+ assert snapshot["position"]["altitude_m"] == 545.4
+ assert snapshot["motion"]["speed_knots"] == 22.4
+ assert snapshot["motion"]["speed_kmh"] == 41.485
+ assert snapshot["motion"]["course_degrees"] == 84.4
+ assert snapshot["motion"]["magnetic_variation_degrees"] == -3.1
+ assert snapshot["accuracy"]["hdop"] == 1.0
+ assert snapshot["accuracy"]["pdop"] == 1.8
+ assert snapshot["accuracy"]["vdop"] == 1.5
+ assert snapshot["fix"]["quality"] == 1
+ assert snapshot["fix"]["quality_label"] == "GPS"
+ assert snapshot["fix"]["gsa_fix_type_label"] == "3D fix"
+ assert snapshot["satellites"]["used_count"] == 7
+ assert snapshot["satellites"]["in_view_count"] == 3
+ assert snapshot["satellites"]["snr"]["max"] == 42.0
+ assert snapshot["time"]["date"] == "1994-03-23"
+ assert snapshot["time"]["datetime_utc"] == "1994-03-23T12:35:19.000000+00:00"
+ assert set(snapshot["nmea"]["seen_sentence_types"]) == {"GGA", "GSA", "GSV", "RMC"}
+
+
+def test_nmea_parser_rejects_bad_checksum_when_validation_enabled():
+ parser = NMEAParser(validate_checksum=True)
+
+ accepted = parser.ingest_sentence(
+ "$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*00"
+ )
+
+ snapshot = parser.snapshot()
+ assert accepted is False
+ assert snapshot["status"]["state"] == "error"
+ assert snapshot["nmea"]["invalid_checksum_count"] == 1
+ assert snapshot["status"]["last_error"] == "NMEA checksum mismatch"
+
+
+def test_gps_service_file_source_reads_nmea_lines(tmp_path):
+ path = tmp_path / "gps_nmea.txt"
+ path.write_text(
+ "\n".join(
+ [
+ _sentence("GPRMC,010203,A,4250.123,N,07106.456,W,000.0,180.0,230426,,"),
+ _sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,"),
+ ]
+ ),
+ encoding="utf-8",
+ )
+
+ service = GPSService(
+ {
+ "gps": {
+ "enabled": True,
+ "source": "file",
+ "source_path": str(path),
+ "poll_interval_seconds": 0.05,
+ "stale_after_seconds": 5.0,
+ }
+ }
+ )
+ service.start()
+ try:
+ deadline = time.time() + 1.0
+ snapshot = service.get_snapshot()
+ while snapshot["status"]["state"] == "no_data" and time.time() < deadline:
+ time.sleep(0.05)
+ snapshot = service.get_snapshot()
+ finally:
+ service.stop()
+
+ assert snapshot["status"]["state"] == "valid_fix"
+ assert snapshot["position"]["latitude"] == 42.83538333
+ assert snapshot["position"]["longitude"] == -71.1076
+ assert snapshot["satellites"]["used_count"] == 5
+
+
+def test_gps_service_uses_manual_location_until_gps_fix():
+ service = GPSService(
+ {
+ "repeater": {
+ "latitude": 42.123456,
+ "longitude": -71.654321,
+ },
+ "gps": {
+ "enabled": True,
+ },
+ }
+ )
+
+ assert service.ingest_sentence(
+ _sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
+ )
+
+ snapshot = service.get_snapshot()
+
+ assert snapshot["status"]["state"] == "invalid_fix"
+ assert snapshot["position"]["latitude"] == 42.123456
+ assert snapshot["position"]["longitude"] == -71.654321
+ assert snapshot["gps_position"]["latitude"] == 42.83538333
+ assert snapshot["gps_position"]["longitude"] == -71.1076
+ assert snapshot["manual_position"]["latitude"] == 42.123456
+ assert snapshot["manual_position"]["longitude"] == -71.654321
+ assert snapshot["position_meta"]["source"] == "manual_config"
+ assert snapshot["position_meta"]["source_label"] == "manual config until GPS fix"
+ assert snapshot["position_meta"]["manual_config_available"] is True
+ assert snapshot["position_meta"]["gps_fix_valid"] is False
+
+
+def test_gps_service_uses_gps_location_after_valid_fix():
+ service = GPSService(
+ {
+ "repeater": {
+ "latitude": 42.123456,
+ "longitude": -71.654321,
+ },
+ "gps": {
+ "enabled": True,
+ },
+ }
+ )
+
+ assert service.ingest_sentence(
+ _sentence("GPGGA,010203,4250.123,N,07106.456,W,1,05,1.4,32.0,M,0.0,M,,")
+ )
+
+ snapshot = service.get_snapshot()
+
+ assert snapshot["status"]["state"] == "valid_fix"
+ assert snapshot["position"]["latitude"] == 42.83538333
+ assert snapshot["position"]["longitude"] == -71.1076
+ assert snapshot["manual_position"]["latitude"] == 42.123456
+ assert snapshot["manual_position"]["longitude"] == -71.654321
+ assert snapshot["position_meta"]["source"] == "gps"
+ assert snapshot["position_meta"]["source_label"] == "GPS fix"
+ assert snapshot["position_meta"]["manual_config_available"] is True
+ assert snapshot["position_meta"]["gps_fix_valid"] is True
+
+
+def test_gps_service_treats_zero_zero_manual_location_as_unset():
+ service = GPSService(
+ {
+ "repeater": {
+ "latitude": 0.0,
+ "longitude": 0.0,
+ },
+ "gps": {
+ "enabled": True,
+ },
+ }
+ )
+
+ assert service.ingest_sentence(
+ _sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
+ )
+
+ snapshot = service.get_snapshot()
+
+ assert snapshot["status"]["state"] == "invalid_fix"
+ assert snapshot["position"]["latitude"] == 42.83538333
+ assert snapshot["position"]["longitude"] == -71.1076
+ assert snapshot["manual_position"] is None
+ assert snapshot["position_meta"]["source"] == "gps"
+ assert snapshot["position_meta"]["source_label"] == "GPS estimate"
+ assert snapshot["position_meta"]["manual_config_available"] is False
+ assert snapshot["position_meta"]["gps_fix_valid"] is False
+
+
+def test_gps_service_reflects_runtime_manual_location_updates():
+ config = {
+ "repeater": {
+ "latitude": 0.0,
+ "longitude": 0.0,
+ },
+ "gps": {
+ "enabled": True,
+ },
+ }
+ service = GPSService(config)
+
+ assert service.ingest_sentence(
+ _sentence("GPGGA,010203,4250.123,N,07106.456,W,0,00,8.8,32.0,M,0.0,M,,")
+ )
+ assert service.get_snapshot()["position_meta"]["source"] == "gps"
+
+ config["repeater"]["latitude"] = 42.123456
+ config["repeater"]["longitude"] = -71.654321
+ snapshot = service.get_snapshot()
+
+ assert snapshot["position"]["latitude"] == 42.123456
+ assert snapshot["position"]["longitude"] == -71.654321
+ assert snapshot["gps_position"]["latitude"] == 42.83538333
+ assert snapshot["gps_position"]["longitude"] == -71.1076
+ assert snapshot["position_meta"]["source"] == "manual_config"