mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-03-28 17:42:56 +01:00
Receiver nodes now sync contacts to MQTT on every advert received
This commit is contained in:
14
AGENTS.md
14
AGENTS.md
@@ -624,6 +624,20 @@ On startup, the receiver performs these initialization steps:
|
||||
1. Set device clock to current Unix timestamp
|
||||
2. Send a local (non-flood) advertisement
|
||||
3. Start automatic message fetching
|
||||
4. Sync the device's contact database
|
||||
|
||||
### Contact Sync Behavior
|
||||
|
||||
The receiver syncs the device's contact database in two scenarios:
|
||||
|
||||
1. **Startup**: Initial sync when receiver starts
|
||||
2. **Advertisement Events**: Automatic sync triggered whenever an advertisement is received from the mesh
|
||||
|
||||
Since advertisements are typically received every ~20 minutes, contact sync happens automatically without manual intervention. Each contact from the device is published individually to MQTT:
|
||||
- Topic: `{prefix}/{device_public_key}/event/contact`
|
||||
- Payload: `{public_key, adv_name, type}`
|
||||
|
||||
This ensures the collector's database stays current with all nodes discovered on the mesh network.
|
||||
|
||||
## References
|
||||
|
||||
|
||||
@@ -193,11 +193,24 @@ class BaseMeshCoreDevice(ABC):
|
||||
|
||||
Triggers a CONTACTS event with all stored contacts from the device.
|
||||
|
||||
Note: This should only be called before the event loop is running.
|
||||
|
||||
Returns:
|
||||
True if request was sent successfully
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def schedule_get_contacts(self) -> bool:
|
||||
"""Schedule a get_contacts request on the event loop.
|
||||
|
||||
This is safe to call from event handlers while the event loop is running.
|
||||
|
||||
Returns:
|
||||
True if request was scheduled successfully
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def run(self) -> None:
|
||||
"""Run the device event loop (blocking)."""
|
||||
@@ -567,7 +580,12 @@ class MeshCoreDevice(BaseMeshCoreDevice):
|
||||
return False
|
||||
|
||||
def get_contacts(self) -> bool:
|
||||
"""Fetch contacts from device contact database."""
|
||||
"""Fetch contacts from device contact database.
|
||||
|
||||
Note: This method should only be called before the event loop is running
|
||||
(e.g., during initialization). For calling during event processing,
|
||||
use schedule_get_contacts() instead.
|
||||
"""
|
||||
if not self._connected or not self._mc:
|
||||
logger.error("Cannot get contacts: not connected")
|
||||
return False
|
||||
@@ -584,6 +602,31 @@ class MeshCoreDevice(BaseMeshCoreDevice):
|
||||
logger.error(f"Failed to get contacts: {e}")
|
||||
return False
|
||||
|
||||
def schedule_get_contacts(self) -> bool:
|
||||
"""Schedule a get_contacts request on the event loop.
|
||||
|
||||
This is safe to call from event handlers while the event loop is running.
|
||||
The request is scheduled as a task on the event loop.
|
||||
|
||||
Returns:
|
||||
True if request was scheduled, False if device not connected
|
||||
"""
|
||||
if not self._connected or not self._mc:
|
||||
logger.error("Cannot get contacts: not connected")
|
||||
return False
|
||||
|
||||
try:
|
||||
|
||||
async def _get_contacts() -> None:
|
||||
await self._mc.commands.get_contacts()
|
||||
|
||||
asyncio.run_coroutine_threadsafe(_get_contacts(), self._loop)
|
||||
logger.info("Scheduled contact sync request")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to schedule get contacts: {e}")
|
||||
return False
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the device event loop."""
|
||||
self._running = True
|
||||
|
||||
@@ -292,7 +292,10 @@ class MockMeshCoreDevice(BaseMeshCoreDevice):
|
||||
return True
|
||||
|
||||
def get_contacts(self) -> bool:
|
||||
"""Fetch contacts from mock device contact database."""
|
||||
"""Fetch contacts from mock device contact database.
|
||||
|
||||
Note: This should only be called before the event loop is running.
|
||||
"""
|
||||
if not self._connected:
|
||||
logger.error("Cannot get contacts: not connected")
|
||||
return False
|
||||
@@ -318,6 +321,14 @@ class MockMeshCoreDevice(BaseMeshCoreDevice):
|
||||
threading.Thread(target=send_contacts, daemon=True).start()
|
||||
return True
|
||||
|
||||
def schedule_get_contacts(self) -> bool:
|
||||
"""Schedule a get_contacts request.
|
||||
|
||||
For the mock device, this is the same as get_contacts() since we
|
||||
don't have a real async event loop. The contacts are sent via a thread.
|
||||
"""
|
||||
return self.get_contacts()
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the mock device event loop."""
|
||||
self._running = True
|
||||
|
||||
@@ -144,9 +144,24 @@ class Receiver:
|
||||
|
||||
logger.debug(f"Published {event_name} event to MQTT")
|
||||
|
||||
# Trigger contact sync on advertisements
|
||||
if event_type == EventType.ADVERTISEMENT:
|
||||
self._sync_contacts()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to publish event to MQTT: {e}")
|
||||
|
||||
def _sync_contacts(self) -> None:
|
||||
"""Request contact sync from device.
|
||||
|
||||
Called when advertisements are received to ensure contact database
|
||||
stays current with all nodes on the mesh.
|
||||
"""
|
||||
logger.info("Advertisement received, triggering contact sync")
|
||||
success = self.device.schedule_get_contacts()
|
||||
if not success:
|
||||
logger.warning("Contact sync request failed")
|
||||
|
||||
def _publish_contacts(self, payload: dict[str, Any]) -> None:
|
||||
"""Publish each contact as a separate MQTT message.
|
||||
|
||||
|
||||
@@ -62,6 +62,56 @@ class TestReceiver:
|
||||
# Verify MQTT publish was called
|
||||
mock_mqtt_client.publish_event.assert_called()
|
||||
|
||||
def test_receiver_syncs_contacts_on_advertisement(
|
||||
self, receiver, mock_device, mock_mqtt_client
|
||||
):
|
||||
"""Test that receiver syncs contacts when advertisement is received."""
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
receiver.start()
|
||||
|
||||
# Patch schedule_get_contacts to track calls
|
||||
with patch.object(
|
||||
mock_device, "schedule_get_contacts", return_value=True
|
||||
) as mock_get:
|
||||
# Inject an advertisement event
|
||||
mock_device.inject_event(
|
||||
EventType.ADVERTISEMENT,
|
||||
{"pubkey_prefix": "b" * 64, "adv_name": "TestNode", "type": 1},
|
||||
)
|
||||
|
||||
# Allow time for event processing
|
||||
time.sleep(0.1)
|
||||
|
||||
# Verify schedule_get_contacts was called
|
||||
mock_get.assert_called()
|
||||
|
||||
def test_receiver_handles_contact_sync_failure(
|
||||
self, receiver, mock_device, mock_mqtt_client
|
||||
):
|
||||
"""Test that receiver handles contact sync failures gracefully."""
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
receiver.start()
|
||||
|
||||
# Patch schedule_get_contacts to return False (failure)
|
||||
with patch.object(
|
||||
mock_device, "schedule_get_contacts", return_value=False
|
||||
) as mock_get:
|
||||
# Should not raise exception even if sync fails
|
||||
mock_device.inject_event(
|
||||
EventType.ADVERTISEMENT,
|
||||
{"pubkey_prefix": "c" * 64, "adv_name": "FailNode", "type": 1},
|
||||
)
|
||||
|
||||
# Allow time for event processing
|
||||
time.sleep(0.1)
|
||||
|
||||
# Verify it was attempted
|
||||
mock_get.assert_called()
|
||||
|
||||
|
||||
class TestCreateReceiver:
|
||||
"""Tests for create_receiver factory function."""
|
||||
|
||||
Reference in New Issue
Block a user