From 64ec1a713549159d66adbd8a9d266dc3c044e493 Mon Sep 17 00:00:00 2001 From: Louis King Date: Sun, 7 Dec 2025 23:34:33 +0000 Subject: [PATCH] Receiver nodes now sync contacts to MQTT on every advert received --- AGENTS.md | 14 +++++++ src/meshcore_hub/interface/device.py | 45 +++++++++++++++++++- src/meshcore_hub/interface/mock_device.py | 13 +++++- src/meshcore_hub/interface/receiver.py | 15 +++++++ tests/test_interface/test_receiver.py | 50 +++++++++++++++++++++++ 5 files changed, 135 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 3351a7b..09a5f76 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -624,6 +624,20 @@ On startup, the receiver performs these initialization steps: 1. Set device clock to current Unix timestamp 2. Send a local (non-flood) advertisement 3. Start automatic message fetching +4. Sync the device's contact database + +### Contact Sync Behavior + +The receiver syncs the device's contact database in two scenarios: + +1. **Startup**: Initial sync when receiver starts +2. **Advertisement Events**: Automatic sync triggered whenever an advertisement is received from the mesh + +Since advertisements are typically received every ~20 minutes, contact sync happens automatically without manual intervention. Each contact from the device is published individually to MQTT: +- Topic: `{prefix}/{device_public_key}/event/contact` +- Payload: `{public_key, adv_name, type}` + +This ensures the collector's database stays current with all nodes discovered on the mesh network. ## References diff --git a/src/meshcore_hub/interface/device.py b/src/meshcore_hub/interface/device.py index 5a9c411..1ebe46e 100644 --- a/src/meshcore_hub/interface/device.py +++ b/src/meshcore_hub/interface/device.py @@ -193,11 +193,24 @@ class BaseMeshCoreDevice(ABC): Triggers a CONTACTS event with all stored contacts from the device. + Note: This should only be called before the event loop is running. + Returns: True if request was sent successfully """ pass + @abstractmethod + def schedule_get_contacts(self) -> bool: + """Schedule a get_contacts request on the event loop. + + This is safe to call from event handlers while the event loop is running. + + Returns: + True if request was scheduled successfully + """ + pass + @abstractmethod def run(self) -> None: """Run the device event loop (blocking).""" @@ -567,7 +580,12 @@ class MeshCoreDevice(BaseMeshCoreDevice): return False def get_contacts(self) -> bool: - """Fetch contacts from device contact database.""" + """Fetch contacts from device contact database. + + Note: This method should only be called before the event loop is running + (e.g., during initialization). For calling during event processing, + use schedule_get_contacts() instead. + """ if not self._connected or not self._mc: logger.error("Cannot get contacts: not connected") return False @@ -584,6 +602,31 @@ class MeshCoreDevice(BaseMeshCoreDevice): logger.error(f"Failed to get contacts: {e}") return False + def schedule_get_contacts(self) -> bool: + """Schedule a get_contacts request on the event loop. + + This is safe to call from event handlers while the event loop is running. + The request is scheduled as a task on the event loop. + + Returns: + True if request was scheduled, False if device not connected + """ + if not self._connected or not self._mc: + logger.error("Cannot get contacts: not connected") + return False + + try: + + async def _get_contacts() -> None: + await self._mc.commands.get_contacts() + + asyncio.run_coroutine_threadsafe(_get_contacts(), self._loop) + logger.info("Scheduled contact sync request") + return True + except Exception as e: + logger.error(f"Failed to schedule get contacts: {e}") + return False + def run(self) -> None: """Run the device event loop.""" self._running = True diff --git a/src/meshcore_hub/interface/mock_device.py b/src/meshcore_hub/interface/mock_device.py index 03b4c17..2c11f55 100644 --- a/src/meshcore_hub/interface/mock_device.py +++ b/src/meshcore_hub/interface/mock_device.py @@ -292,7 +292,10 @@ class MockMeshCoreDevice(BaseMeshCoreDevice): return True def get_contacts(self) -> bool: - """Fetch contacts from mock device contact database.""" + """Fetch contacts from mock device contact database. + + Note: This should only be called before the event loop is running. + """ if not self._connected: logger.error("Cannot get contacts: not connected") return False @@ -318,6 +321,14 @@ class MockMeshCoreDevice(BaseMeshCoreDevice): threading.Thread(target=send_contacts, daemon=True).start() return True + def schedule_get_contacts(self) -> bool: + """Schedule a get_contacts request. + + For the mock device, this is the same as get_contacts() since we + don't have a real async event loop. The contacts are sent via a thread. + """ + return self.get_contacts() + def run(self) -> None: """Run the mock device event loop.""" self._running = True diff --git a/src/meshcore_hub/interface/receiver.py b/src/meshcore_hub/interface/receiver.py index 2383ee5..55778c1 100644 --- a/src/meshcore_hub/interface/receiver.py +++ b/src/meshcore_hub/interface/receiver.py @@ -144,9 +144,24 @@ class Receiver: logger.debug(f"Published {event_name} event to MQTT") + # Trigger contact sync on advertisements + if event_type == EventType.ADVERTISEMENT: + self._sync_contacts() + except Exception as e: logger.error(f"Failed to publish event to MQTT: {e}") + def _sync_contacts(self) -> None: + """Request contact sync from device. + + Called when advertisements are received to ensure contact database + stays current with all nodes on the mesh. + """ + logger.info("Advertisement received, triggering contact sync") + success = self.device.schedule_get_contacts() + if not success: + logger.warning("Contact sync request failed") + def _publish_contacts(self, payload: dict[str, Any]) -> None: """Publish each contact as a separate MQTT message. diff --git a/tests/test_interface/test_receiver.py b/tests/test_interface/test_receiver.py index ff0d376..9e8fad4 100644 --- a/tests/test_interface/test_receiver.py +++ b/tests/test_interface/test_receiver.py @@ -62,6 +62,56 @@ class TestReceiver: # Verify MQTT publish was called mock_mqtt_client.publish_event.assert_called() + def test_receiver_syncs_contacts_on_advertisement( + self, receiver, mock_device, mock_mqtt_client + ): + """Test that receiver syncs contacts when advertisement is received.""" + import time + from unittest.mock import patch + + receiver.start() + + # Patch schedule_get_contacts to track calls + with patch.object( + mock_device, "schedule_get_contacts", return_value=True + ) as mock_get: + # Inject an advertisement event + mock_device.inject_event( + EventType.ADVERTISEMENT, + {"pubkey_prefix": "b" * 64, "adv_name": "TestNode", "type": 1}, + ) + + # Allow time for event processing + time.sleep(0.1) + + # Verify schedule_get_contacts was called + mock_get.assert_called() + + def test_receiver_handles_contact_sync_failure( + self, receiver, mock_device, mock_mqtt_client + ): + """Test that receiver handles contact sync failures gracefully.""" + import time + from unittest.mock import patch + + receiver.start() + + # Patch schedule_get_contacts to return False (failure) + with patch.object( + mock_device, "schedule_get_contacts", return_value=False + ) as mock_get: + # Should not raise exception even if sync fails + mock_device.inject_event( + EventType.ADVERTISEMENT, + {"pubkey_prefix": "c" * 64, "adv_name": "FailNode", "type": 1}, + ) + + # Allow time for event processing + time.sleep(0.1) + + # Verify it was attempted + mock_get.assert_called() + class TestCreateReceiver: """Tests for create_receiver factory function."""