feat: sync system time from GPS

This commit is contained in:
Mitchell Moss
2026-04-24 08:34:30 -04:00
parent 9cd2de94e8
commit 8ae1c0f65f
7 changed files with 337 additions and 5 deletions
+7
View File
@@ -159,6 +159,13 @@ gps:
validate_checksum: true
require_checksum: false
# Automatically set the Linux system clock from GPS UTC time once the receiver
# has a valid non-stale fix. The systemd service grants CAP_SYS_TIME for this.
time_sync_enabled: true
time_sync_interval_seconds: 3600.0
time_sync_min_offset_seconds: 1.0
time_sync_min_valid_year: 2020
# Mesh Network Configuration
mesh:
# Unscoped flood policy - controls whether the repeater allows or denies unscoped flooding
+4
View File
@@ -11,5 +11,9 @@ ExecStart=/usr/bin/pymc-repeater
Restart=always
RestartSec=10
# Allow GPS time sync to update CLOCK_REALTIME without running as root
CapabilityBoundingSet=CAP_SYS_TIME
AmbientCapabilities=CAP_SYS_TIME
[Install]
WantedBy=multi-user.target
+4
View File
@@ -34,5 +34,9 @@ SyslogIdentifier=pymc-repeater
ReadWritePaths=/var/log/pymc_repeater /var/lib/pymc_repeater /etc/pymc_repeater
SupplementaryGroups=plugdev dialout
# Allow GPS time sync to update CLOCK_REALTIME without running as root
CapabilityBoundingSet=CAP_SYS_TIME
AmbientCapabilities=CAP_SYS_TIME
[Install]
WantedBy=multi-user.target
+4
View File
@@ -92,6 +92,10 @@ def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"retain_sentences": 25,
"validate_checksum": True,
"require_checksum": False,
"time_sync_enabled": True,
"time_sync_interval_seconds": 3600.0,
"time_sync_min_offset_seconds": 1.0,
"time_sync_min_valid_year": 2020,
}
# Ensure repeater.security exists with defaults for upgrades from older configs
+175 -4
View File
@@ -18,7 +18,7 @@ from collections import Counter, deque
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
logger = logging.getLogger("GPSService")
@@ -153,6 +153,24 @@ def _combine_datetime_utc(date_value: Optional[str], time_value: Optional[str])
return None
def _parse_datetime_utc(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _set_system_clock_from_datetime(value: datetime):
if not hasattr(time, "clock_settime") or not hasattr(time, "CLOCK_REALTIME"):
raise RuntimeError("time.clock_settime(CLOCK_REALTIME) is not available")
time.clock_settime(time.CLOCK_REALTIME, value.timestamp())
class NMEAParser:
"""Small NMEA parser focused on diagnostics fields used by GPS receivers."""
@@ -550,7 +568,13 @@ class NMEAParser:
class GPSService:
"""Runtime GPS acquisition service."""
def __init__(self, config: Dict[str, Any]):
def __init__(
self,
config: Dict[str, Any],
*,
clock_setter: Optional[Callable[[datetime], None]] = None,
time_provider: Optional[Callable[[], float]] = None,
):
gps_config = config.get("gps", {}) if isinstance(config, dict) else {}
repeater_config = config.get("repeater", {}) if isinstance(config, dict) else {}
self.config = gps_config
@@ -566,6 +590,30 @@ class GPSService:
self.poll_interval_seconds = float(gps_config.get("poll_interval_seconds", 2.0))
self.source_path = gps_config.get("source_path") or gps_config.get("snapshot_path")
self.repeater_config = repeater_config
self.time_sync_enabled = bool(gps_config.get("time_sync_enabled", True))
self.time_sync_interval_seconds = max(
1.0, float(gps_config.get("time_sync_interval_seconds", 3600.0))
)
self.time_sync_min_offset_seconds = max(
0.0, float(gps_config.get("time_sync_min_offset_seconds", 1.0))
)
self.time_sync_min_valid_year = int(gps_config.get("time_sync_min_valid_year", 2020))
self._clock_setter = clock_setter or _set_system_clock_from_datetime
self._time_provider = time_provider or time.time
self._time_sync_lock = threading.RLock()
self._last_time_sync_monotonic: Optional[float] = None
self._time_sync_status: Dict[str, Any] = {
"enabled": self.time_sync_enabled,
"state": "disabled" if not self.time_sync_enabled else "waiting_for_fix",
"last_attempt": None,
"last_success": None,
"last_error": None,
"last_gps_time": None,
"last_offset_seconds": None,
"interval_seconds": self.time_sync_interval_seconds,
"min_offset_seconds": self.time_sync_min_offset_seconds,
"min_valid_year": self.time_sync_min_valid_year,
}
self.parser = NMEAParser(
validate_checksum=bool(gps_config.get("validate_checksum", True)),
require_checksum=bool(gps_config.get("require_checksum", False)),
@@ -599,7 +647,10 @@ class GPSService:
logger.info("GPS service stopped")
def ingest_sentence(self, sentence: str) -> bool:
return self.parser.ingest_sentence(sentence)
accepted = self.parser.ingest_sentence(sentence)
if accepted:
self._maybe_sync_system_time()
return accepted
def get_summary(self) -> Dict[str, Any]:
snapshot = self.get_snapshot()
@@ -616,6 +667,7 @@ class GPSService:
"position_meta": snapshot.get("position_meta"),
"gps_position": snapshot.get("gps_position"),
"manual_position": snapshot.get("manual_position"),
"time_sync": snapshot.get("time_sync"),
"satellites": {
"used_count": snapshot["satellites"].get("used_count"),
"in_view_count": snapshot["satellites"].get("in_view_count"),
@@ -639,6 +691,7 @@ class GPSService:
"poll_interval_seconds": self.poll_interval_seconds,
"stale_after_seconds": self.parser.stale_after_seconds,
},
"time_sync": self._get_time_sync_status(snapshot),
}
)
if not self.enabled:
@@ -712,6 +765,123 @@ class GPSService:
else:
self.parser.last_error = None
def _get_time_sync_status(self, snapshot: Dict[str, Any]) -> Dict[str, Any]:
with self._time_sync_lock:
status = deepcopy(self._time_sync_status)
if not self.enabled:
status["state"] = "disabled"
return status
if not self.time_sync_enabled:
status["state"] = "disabled"
return status
if status.get("state") in ("synced", "error", "in_sync", "ignored"):
return status
if not snapshot.get("status", {}).get("fix_valid"):
status["state"] = "waiting_for_fix"
elif not snapshot.get("time", {}).get("datetime_utc"):
status["state"] = "waiting_for_time"
else:
status["state"] = "ready"
return status
def _record_time_sync_status(
self,
*,
state: str,
gps_time: Optional[datetime] = None,
offset_seconds: Optional[float] = None,
error: Optional[str] = None,
success: bool = False,
):
timestamp = datetime.now(timezone.utc).isoformat()
with self._time_sync_lock:
self._time_sync_status.update(
{
"enabled": self.time_sync_enabled,
"state": state,
"last_attempt": timestamp,
"last_error": error,
"last_gps_time": gps_time.isoformat() if gps_time else None,
"last_offset_seconds": (
round(offset_seconds, 3) if offset_seconds is not None else None
),
"interval_seconds": self.time_sync_interval_seconds,
"min_offset_seconds": self.time_sync_min_offset_seconds,
"min_valid_year": self.time_sync_min_valid_year,
}
)
if success:
self._time_sync_status["last_success"] = timestamp
def _maybe_sync_system_time(self):
if not self.enabled or not self.time_sync_enabled:
return
now_monotonic = time.monotonic()
with self._time_sync_lock:
if (
self._last_time_sync_monotonic is not None
and now_monotonic - self._last_time_sync_monotonic
< self.time_sync_interval_seconds
):
return
snapshot = self.parser.snapshot()
if not snapshot.get("status", {}).get("fix_valid"):
return
gps_time = _parse_datetime_utc(snapshot.get("time", {}).get("datetime_utc"))
if gps_time is None:
return
if gps_time.year < self.time_sync_min_valid_year:
self._record_time_sync_status(
state="ignored",
gps_time=gps_time,
error=(
f"GPS time year {gps_time.year} is older than "
f"minimum {self.time_sync_min_valid_year}"
),
)
return
system_now = self._time_provider()
offset_seconds = gps_time.timestamp() - system_now
self._last_time_sync_monotonic = now_monotonic
if abs(offset_seconds) < self.time_sync_min_offset_seconds:
self._record_time_sync_status(
state="in_sync",
gps_time=gps_time,
offset_seconds=offset_seconds,
success=True,
)
return
try:
self._clock_setter(gps_time)
except Exception as exc:
self._record_time_sync_status(
state="error",
gps_time=gps_time,
offset_seconds=offset_seconds,
error=f"{type(exc).__name__}: {exc}",
)
logger.warning("GPS system time sync failed: %s", exc)
return
self._record_time_sync_status(
state="synced",
gps_time=gps_time,
offset_seconds=offset_seconds,
success=True,
)
logger.info(
"System clock synchronized from GPS time %s (offset %.3fs)",
gps_time.isoformat(),
offset_seconds,
)
def _run_serial_loop(self):
try:
import serial # type: ignore
@@ -755,7 +925,8 @@ class GPSService:
self._last_file_content = content
self._set_source_error(None)
lines = self._extract_file_sentences(content)
self.parser.ingest_many(lines)
for line in lines:
self.ingest_sentence(line)
except FileNotFoundError:
self._set_source_error(f"GPS source file not found: {path}")
except Exception as exc:
+24
View File
@@ -380,6 +380,30 @@ paths:
type: object
time:
type: object
time_sync:
type: object
description: GPS-to-system-clock sync status
properties:
enabled:
type: boolean
state:
type: string
enum: [disabled, waiting_for_fix, waiting_for_time, ready, in_sync, synced, error, ignored]
last_attempt:
type: string
nullable: true
last_success:
type: string
nullable: true
last_error:
type: string
nullable: true
last_gps_time:
type: string
nullable: true
last_offset_seconds:
type: number
nullable: true
satellites:
type: object
nmea:
+119 -1
View File
@@ -1,5 +1,6 @@
import time
import importlib.util
import time
from datetime import datetime, timezone
from pathlib import Path
_MODULE_PATH = Path(__file__).resolve().parents[1] / "repeater" / "data_acquisition" / "gps_service.py"
@@ -92,6 +93,7 @@ def test_gps_service_file_source_reads_nmea_lines(tmp_path):
"source_path": str(path),
"poll_interval_seconds": 0.05,
"stale_after_seconds": 5.0,
"time_sync_enabled": False,
}
}
)
@@ -143,6 +145,122 @@ def test_gps_service_uses_manual_location_until_gps_fix():
assert snapshot["position_meta"]["gps_fix_valid"] is False
def test_gps_service_sets_system_time_from_valid_gps_datetime():
clock_calls = []
system_now = datetime(2026, 4, 23, 1, 1, 0, tzinfo=timezone.utc).timestamp()
service = GPSService(
{
"gps": {
"enabled": True,
"time_sync_enabled": True,
"time_sync_interval_seconds": 60.0,
"time_sync_min_offset_seconds": 1.0,
},
},
clock_setter=clock_calls.append,
time_provider=lambda: system_now,
)
assert service.ingest_sentence(
_sentence("GPRMC,010203,A,4250.123,N,07106.456,W,000.0,180.0,230426,,")
)
snapshot = service.get_snapshot()
assert len(clock_calls) == 1
assert clock_calls[0] == datetime(2026, 4, 23, 1, 2, 3, tzinfo=timezone.utc)
assert snapshot["time_sync"]["state"] == "synced"
assert snapshot["time_sync"]["last_error"] is None
assert snapshot["time_sync"]["last_gps_time"] == "2026-04-23T01:02:03+00:00"
assert snapshot["time_sync"]["last_offset_seconds"] == 63.0
def test_gps_service_does_not_set_system_time_without_valid_fix():
clock_calls = []
service = GPSService(
{
"gps": {
"enabled": True,
"time_sync_enabled": True,
},
},
clock_setter=clock_calls.append,
)
assert service.ingest_sentence(
_sentence("GPRMC,010203,V,4250.123,N,07106.456,W,000.0,180.0,230426,,")
)
snapshot = service.get_snapshot()
assert clock_calls == []
assert snapshot["status"]["state"] == "invalid_fix"
assert snapshot["time_sync"]["state"] == "waiting_for_fix"
def test_gps_service_ignores_old_gps_time_without_throttling_valid_time():
clock_calls = []
system_now = datetime(2026, 4, 23, 1, 0, 0, tzinfo=timezone.utc).timestamp()
service = GPSService(
{
"gps": {
"enabled": True,
"time_sync_enabled": True,
"time_sync_min_valid_year": 2020,
},
},
clock_setter=clock_calls.append,
time_provider=lambda: system_now,
)
assert service.ingest_sentence(
_sentence("GPRMC,010203,A,4250.123,N,07106.456,W,000.0,180.0,230394,,")
)
assert clock_calls == []
assert service.get_snapshot()["time_sync"]["state"] == "ignored"
assert service.ingest_sentence(
_sentence("GPRMC,010203,A,4250.123,N,07106.456,W,000.0,180.0,230426,,")
)
assert clock_calls == [datetime(2026, 4, 23, 1, 2, 3, tzinfo=timezone.utc)]
assert service.get_snapshot()["time_sync"]["state"] == "synced"
def test_gps_service_file_source_uses_time_sync_hook(tmp_path):
path = tmp_path / "gps_nmea.txt"
path.write_text(
_sentence("GPRMC,010203,A,4250.123,N,07106.456,W,000.0,180.0,230426,,"),
encoding="utf-8",
)
clock_calls = []
system_now = datetime(2026, 4, 23, 1, 0, 0, tzinfo=timezone.utc).timestamp()
service = GPSService(
{
"gps": {
"enabled": True,
"source": "file",
"source_path": str(path),
"poll_interval_seconds": 0.05,
"stale_after_seconds": 5.0,
"time_sync_enabled": True,
}
},
clock_setter=clock_calls.append,
time_provider=lambda: system_now,
)
service.start()
try:
deadline = time.time() + 1.0
while not clock_calls and time.time() < deadline:
time.sleep(0.05)
finally:
service.stop()
assert clock_calls == [datetime(2026, 4, 23, 1, 2, 3, tzinfo=timezone.utc)]
def test_gps_service_uses_gps_location_after_valid_fix():
service = GPSService(
{