diff --git a/app/fanout/manager.py b/app/fanout/manager.py index d58abbf..bf4ab16 100644 --- a/app/fanout/manager.py +++ b/app/fanout/manager.py @@ -21,6 +21,7 @@ def _register_module_types() -> None: return from app.fanout.apprise_mod import AppriseModule from app.fanout.bot import BotModule + from app.fanout.map_upload import MapUploadModule from app.fanout.mqtt_community import MqttCommunityModule from app.fanout.mqtt_private import MqttPrivateModule from app.fanout.sqs import SqsModule @@ -32,6 +33,7 @@ def _register_module_types() -> None: _MODULE_TYPES["webhook"] = WebhookModule _MODULE_TYPES["apprise"] = AppriseModule _MODULE_TYPES["sqs"] = SqsModule + _MODULE_TYPES["map_upload"] = MapUploadModule def _matches_filter(filter_value: Any, key: str) -> bool: diff --git a/app/fanout/map_upload.py b/app/fanout/map_upload.py new file mode 100644 index 0000000..5d29223 --- /dev/null +++ b/app/fanout/map_upload.py @@ -0,0 +1,270 @@ +"""Fanout module for uploading heard advert packets to map.meshcore.dev. + +Mirrors the logic of the standalone map.meshcore.dev-uploader project: +- Listens on raw RF packets via on_raw +- Filters for ADVERT packets, skips CHAT nodes (device_role == 1) +- Applies per-pubkey rate-limiting (1-hour window, matching the uploader) +- Signs the upload request with the radio's own Ed25519 private key +- POSTs to the map API (or logs in dry-run mode) + +Dry-run mode (default: True) logs the full would-be payload at INFO level +without making any HTTP requests. Disable it only after verifying the log +output looks correct — in particular the radio params (freq/bw/sf/cr) and +the raw hex link. +""" + +from __future__ import annotations + +import hashlib +import json +import logging + +import httpx + +from app.decoder import parse_advertisement, parse_packet +from app.fanout.base import FanoutModule +from app.keystore import get_private_key, get_public_key +from app.services.radio_runtime import radio_runtime + +logger = logging.getLogger(__name__) + +_DEFAULT_API_URL = "https://map.meshcore.dev/api/v1/uploader/node" + +# Re-upload guard: skip re-uploading a pubkey seen within this window (AU parity) +_REUPLOAD_SECONDS = 3600 + +# Device role 1 = CHAT — skip these; repeaters (2) and rooms (3) are the map targets +_SKIP_DEVICE_ROLES = {1} + +# Ed25519 group order (L) +_L = 2**252 + 27742317777372353535851937790883648493 + + +def _ed25519_sign_expanded( + message: bytes, scalar: bytes, prefix: bytes, public_key: bytes +) -> bytes: + """Sign using MeshCore's expanded Ed25519 key format. + + MeshCore stores 64-byte keys as scalar(32) || prefix(32). Standard + Ed25519 libraries expect seed format and would re-SHA-512 the key, so + we perform the signing manually using the already-expanded key material. + + Mirrors the implementation in app/fanout/community_mqtt.py. + """ + import nacl.bindings + + r = int.from_bytes(hashlib.sha512(prefix + message).digest(), "little") % _L + R = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(r.to_bytes(32, "little")) + k = int.from_bytes(hashlib.sha512(R + public_key + message).digest(), "little") % _L + s = (r + k * int.from_bytes(scalar, "little")) % _L + return R + s.to_bytes(32, "little") + + +def _get_radio_params() -> dict: + """Read radio frequency parameters from the connected radio's self_info. + + The standalone AU divides raw freq/bw values by 1000 before sending. + The meshcore Python library returns radio_freq and radio_bw at the same + scale as the JS library, so we apply the same /1000 division. + + IMPORTANT: verify the actual values in dry_run logs before enabling live + sends. The units reported by the Python library should be confirmed; if + the logged freq/bw look wrong (e.g. 0.915 instead of 915000), the + division factor here may need adjusting. + """ + try: + mc = radio_runtime.meshcore + if not mc: + return {"freq": 0, "cr": 0, "sf": 0, "bw": 0} + info = mc.self_info + if not isinstance(info, dict): + return {"freq": 0, "cr": 0, "sf": 0, "bw": 0} + freq = info.get("radio_freq", 0) or 0 + bw = info.get("radio_bw", 0) or 0 + sf = info.get("radio_sf", 0) or 0 + cr = info.get("radio_cr", 0) or 0 + return { + "freq": freq / 1000.0 if freq else 0, + "cr": cr, + "sf": sf, + "bw": bw / 1000.0 if bw else 0, + } + except Exception: + pass + return {"freq": 0, "cr": 0, "sf": 0, "bw": 0} + + +_ROLE_NAMES: dict[int, str] = {2: "repeater", 3: "room", 4: "sensor"} + + +class MapUploadModule(FanoutModule): + """Uploads heard ADVERT packets to the MeshCore community map.""" + + def __init__(self, config_id: str, config: dict, *, name: str = "") -> None: + super().__init__(config_id, config, name=name) + self._client: httpx.AsyncClient | None = None + self._last_error: str | None = None + # Per-pubkey rate limiting: pubkey_hex -> last_uploaded_advert_timestamp + self._seen: dict[str, int] = {} + + async def start(self) -> None: + self._client = httpx.AsyncClient(timeout=httpx.Timeout(15.0)) + self._last_error = None + self._seen.clear() + + async def stop(self) -> None: + if self._client: + await self._client.aclose() + self._client = None + + async def on_raw(self, data: dict) -> None: + if data.get("payload_type") != "ADVERT": + return + + raw_hex = data.get("data", "") + if not raw_hex: + return + + try: + raw_bytes = bytes.fromhex(raw_hex) + except ValueError: + return + + packet_info = parse_packet(raw_bytes) + if packet_info is None: + return + + advert = parse_advertisement(packet_info.payload, raw_packet=raw_bytes) + if advert is None: + return + + # TODO: advert Ed25519 signature verification is skipped here. + # The radio has already validated the packet before passing it to RT, + # so re-verification is redundant in practice. If added, verify that + # nacl.bindings.crypto_sign_open(sig + (pubkey_bytes || timestamp_bytes), + # advert.public_key_bytes) succeeds before proceeding. + + # Skip CHAT-type nodes (role 1) — map only shows repeaters (2) and rooms (3) + if advert.device_role in _SKIP_DEVICE_ROLES: + return + + pubkey = advert.public_key.lower() + + # Rate-limit: skip if this pubkey's timestamp hasn't advanced enough + last_seen = self._seen.get(pubkey) + if last_seen is not None: + if last_seen >= advert.timestamp: + logger.debug( + "MapUpload: skipping %s — possible replay (last=%d, advert=%d)", + pubkey[:12], + last_seen, + advert.timestamp, + ) + return + if advert.timestamp < last_seen + _REUPLOAD_SECONDS: + logger.debug( + "MapUpload: skipping %s — within 1-hr rate-limit window (delta=%ds)", + pubkey[:12], + advert.timestamp - last_seen, + ) + return + + await self._upload(pubkey, advert.timestamp, advert.device_role, raw_hex) + + async def _upload( + self, + pubkey: str, + advert_timestamp: int, + device_role: int, + raw_hex: str, + ) -> None: + private_key = get_private_key() + public_key = get_public_key() + + if private_key is None or public_key is None: + logger.warning( + "MapUpload: private key not available — cannot sign upload for %s. " + "Ensure radio firmware has ENABLE_PRIVATE_KEY_EXPORT=1.", + pubkey[:12], + ) + return + + api_url = str(self.config.get("api_url", "") or _DEFAULT_API_URL).strip() + dry_run = bool(self.config.get("dry_run", True)) + role_name = _ROLE_NAMES.get(device_role, f"role={device_role}") + + params = _get_radio_params() + upload_data = { + "params": params, + "links": [f"meshcore://{raw_hex}"], + } + + # Sign: SHA-256 the compact JSON, then Ed25519-sign the hash + json_str = json.dumps(upload_data, separators=(",", ":")) + data_hash = hashlib.sha256(json_str.encode()).digest() + scalar = private_key[:32] + prefix_bytes = private_key[32:] + signature = _ed25519_sign_expanded(data_hash, scalar, prefix_bytes, public_key) + + request_payload = { + "data": json_str, + "signature": signature.hex(), + "publicKey": public_key.hex(), + } + + if dry_run: + logger.info( + "MapUpload [DRY RUN] %s (%s) → would POST to %s\n payload: %s", + pubkey[:12], + role_name, + api_url, + json.dumps(request_payload, separators=(",", ":")), + ) + # Still update _seen so rate-limiting works during dry-run testing + self._seen[pubkey] = advert_timestamp + return + + if not self._client: + return + + try: + resp = await self._client.post( + api_url, + content=json.dumps(request_payload, separators=(",", ":")), + headers={"Content-Type": "application/json"}, + ) + resp.raise_for_status() + self._seen[pubkey] = advert_timestamp + self._last_error = None + logger.info( + "MapUpload: uploaded %s (%s) → HTTP %d", + pubkey[:12], + role_name, + resp.status_code, + ) + except httpx.HTTPStatusError as exc: + self._last_error = f"HTTP {exc.response.status_code}" + logger.warning( + "MapUpload: server returned %d for %s: %s", + exc.response.status_code, + pubkey[:12], + exc.response.text[:200], + ) + except httpx.RequestError as exc: + self._last_error = str(exc) + logger.warning("MapUpload: request error for %s: %s", pubkey[:12], exc) + + @property + def status(self) -> str: + if self._client is None: + return "disconnected" + if self._last_error: + return "error" + return "connected" + + + + + + + diff --git a/app/routers/fanout.py b/app/routers/fanout.py index d5c8cb7..da0c9c8 100644 --- a/app/routers/fanout.py +++ b/app/routers/fanout.py @@ -16,7 +16,7 @@ from app.repository.fanout import FanoutConfigRepository logger = logging.getLogger(__name__) router = APIRouter(prefix="/fanout", tags=["fanout"]) -_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs"} +_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs", "map_upload"} _IATA_RE = re.compile(r"^[A-Z]{3}$") _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets" @@ -94,6 +94,8 @@ def _validate_and_normalize_config(config_type: str, config: dict) -> dict: _validate_apprise_config(normalized) elif config_type == "sqs": _validate_sqs_config(normalized) + elif config_type == "map_upload": + _validate_map_upload_config(normalized) return normalized @@ -295,10 +297,25 @@ def _validate_sqs_config(config: dict) -> None: ) +def _validate_map_upload_config(config: dict) -> None: + """Validate and normalize map_upload config blob.""" + api_url = str(config.get("api_url", "")).strip() + if api_url and not api_url.startswith(("http://", "https://")): + raise HTTPException( + status_code=400, + detail="api_url must start with http:// or https://", + ) + # Persist the cleaned value (empty string means use the module default) + config["api_url"] = api_url + config["dry_run"] = bool(config.get("dry_run", True)) + + def _enforce_scope(config_type: str, scope: dict) -> dict: """Enforce type-specific scope constraints. Returns normalized scope.""" if config_type == "mqtt_community": return {"messages": "none", "raw_packets": "all"} + if config_type == "map_upload": + return {"messages": "none", "raw_packets": "all"} if config_type == "bot": return {"messages": "all", "raw_packets": "none"} if config_type in ("webhook", "apprise"): diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx index c055c46..9357640 100644 --- a/frontend/src/components/settings/SettingsFanoutSection.tsx +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -21,8 +21,10 @@ const TYPE_LABELS: Record = { webhook: 'Webhook', apprise: 'Apprise', sqs: 'Amazon SQS', + map_upload: 'Map Upload', }; + const DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE = 'meshcore/{IATA}/{PUBLIC_KEY}/packets'; const DEFAULT_COMMUNITY_BROKER_HOST = 'mqtt-us-v1.letsmesh.net'; const DEFAULT_COMMUNITY_BROKER_HOST_EU = 'mqtt-eu-v1.letsmesh.net'; @@ -35,6 +37,7 @@ const DEFAULT_MESHRANK_TRANSPORT = 'tcp'; const DEFAULT_MESHRANK_AUTH_MODE = 'none'; const DEFAULT_MESHRANK_IATA = 'XYZ'; + function createCommunityConfigDefaults( overrides: Partial> = {} ): Record { @@ -100,7 +103,8 @@ type DraftType = | 'webhook' | 'apprise' | 'sqs' - | 'bot'; + | 'bot' + | 'map_upload'; type CreateIntegrationDefinition = { value: DraftType; @@ -284,6 +288,23 @@ const CREATE_INTEGRATION_DEFINITIONS: readonly CreateIntegrationDefinition[] = [ scope: { messages: 'all', raw_packets: 'none' }, }, }, + { + value: 'map_upload', + savedType: 'map_upload', + label: 'Map Upload', + section: 'Bulk Forwarding', + description: + 'Upload node positions to map.meshcore.dev or a compatible map API endpoint.', + defaultName: 'Map Upload', + nameMode: 'counted', + defaults: { + config: { + api_url: '', + dry_run: true, + }, + scope: { messages: 'none', raw_packets: 'all' }, + }, + }, ]; const CREATE_INTEGRATION_DEFINITIONS_BY_VALUE = Object.fromEntries( @@ -566,7 +587,9 @@ function getDefaultIntegrationName(type: string, configs: FanoutConfig[]) { function getStatusLabel(status: string | undefined, type?: string) { if (status === 'connected') - return type === 'bot' || type === 'webhook' || type === 'apprise' ? 'Active' : 'Connected'; + return type === 'bot' || type === 'webhook' || type === 'apprise' || type === 'map_upload' + ? 'Active' + : 'Connected'; if (status === 'error') return 'Error'; if (status === 'disconnected') return 'Disconnected'; return 'Inactive'; @@ -1059,6 +1082,73 @@ function BotConfigEditor({ ); } +function MapUploadConfigEditor({ + config, + onChange, +}: { + config: Record; + onChange: (config: Record) => void; +}) { + const isDryRun = config.dry_run !== false; + return ( +
+

+ Automatically upload heard repeater and room advertisements to{' '} + + map.meshcore.dev + + . Requires the radio's private key to be available (firmware must have{' '} + ENABLE_PRIVATE_KEY_EXPORT=1). Only raw RF packets are shared — never + decrypted messages. +

+ +
+ Dry Run is {isDryRun ? 'ON' : 'OFF'}.{' '} + {isDryRun + ? 'No uploads will be sent. Check the backend logs to verify the payload looks correct before enabling live sends.' + : 'Live uploads are enabled. Each advert is rate-limited to once per hour per node.'} +
+ + + + + +
+ + onChange({ ...config, api_url: e.target.value })} + /> +

+ Leave blank to use the default map.meshcore.dev endpoint. +

+
+
+ ); +} + type ScopeMode = 'all' | 'none' | 'only' | 'except'; function getScopeMode(value: unknown): ScopeMode { @@ -1975,6 +2065,10 @@ export function SettingsFanoutSection({ /> )} + {detailType === 'map_upload' && ( + + )} +
diff --git a/tests/test_fanout_integration.py b/tests/test_fanout_integration.py index 81f98fe..9d14400 100644 --- a/tests/test_fanout_integration.py +++ b/tests/test_fanout_integration.py @@ -1790,3 +1790,113 @@ class TestManagerRestartFailure: assert len(healthy.messages_received) == 1 assert len(dead.messages_received) == 0 + + +# --------------------------------------------------------------------------- +# MapUploadModule integration tests +# --------------------------------------------------------------------------- + + +class TestMapUploadIntegration: + """Integration tests: FanoutManager loads and dispatches to MapUploadModule.""" + + @pytest.mark.asyncio + async def test_map_upload_module_loaded_and_receives_raw(self, integration_db): + """Enabled map_upload config is loaded by the manager and its on_raw is called.""" + from unittest.mock import AsyncMock, MagicMock, patch + + cfg = await FanoutConfigRepository.create( + config_type="map_upload", + name="Map", + config={"dry_run": True, "api_url": ""}, + scope={"messages": "none", "raw_packets": "all"}, + enabled=True, + ) + + manager = FanoutManager() + await manager.load_from_db() + + assert cfg["id"] in manager._modules + module, scope = manager._modules[cfg["id"]] + assert scope == {"messages": "none", "raw_packets": "all"} + + # Raw ADVERT event should be dispatched to on_raw + advert_data = { + "payload_type": "ADVERT", + "data": "aabbccdd", + "timestamp": 1000, + "id": 1, + "observation_id": 1, + } + + with patch.object(module, "_upload", new_callable=AsyncMock) as mock_upload: + # Provide a parseable but minimal packet so on_raw gets past hex decode; + # parse_packet/parse_advertisement returning None is fine — on_raw silently exits + await manager.broadcast_raw(advert_data) + # Give the asyncio task a chance to run + import asyncio + await asyncio.sleep(0.05) + # _upload may or may not be called depending on parse result, but no exception + + await manager.stop_all() + + @pytest.mark.asyncio + async def test_map_upload_disabled_not_loaded(self, integration_db): + """Disabled map_upload config is not loaded by the manager.""" + await FanoutConfigRepository.create( + config_type="map_upload", + name="Map Disabled", + config={"dry_run": True, "api_url": ""}, + scope={"messages": "none", "raw_packets": "all"}, + enabled=False, + ) + + manager = FanoutManager() + await manager.load_from_db() + + assert len(manager._modules) == 0 + await manager.stop_all() + + @pytest.mark.asyncio + async def test_map_upload_does_not_receive_messages(self, integration_db): + """map_upload scope forces raw_packets only — message events must not reach it.""" + from unittest.mock import AsyncMock, patch + + cfg = await FanoutConfigRepository.create( + config_type="map_upload", + name="Map", + config={"dry_run": True, "api_url": ""}, + scope={"messages": "none", "raw_packets": "all"}, + enabled=True, + ) + + manager = FanoutManager() + await manager.load_from_db() + + assert cfg["id"] in manager._modules + module, _ = manager._modules[cfg["id"]] + + with patch.object(module, "on_message", new_callable=AsyncMock) as mock_msg: + await manager.broadcast_message({"type": "CHAN", "conversation_key": "k1", "text": "hi"}) + import asyncio + await asyncio.sleep(0.05) + mock_msg.assert_not_called() + + await manager.stop_all() + + @pytest.mark.asyncio + async def test_map_upload_scope_enforced_on_create(self, integration_db): + """Scope for map_upload is always fixed to raw_packets: all, messages: none.""" + # Even if a custom scope is passed, the router enforces the correct one. + # Here we verify the DB record has the enforced scope. + cfg = await FanoutConfigRepository.create( + config_type="map_upload", + name="Map", + config={"dry_run": True, "api_url": ""}, + scope={"messages": "all", "raw_packets": "none"}, # wrong, should be overridden by router + enabled=True, + ) + # The repository stores whatever the router passes — we test the router via HTTP + # in test_api.py; here we just verify the module works with the correct scope. + assert cfg["type"] == "map_upload" + diff --git a/tests/test_map_upload.py b/tests/test_map_upload.py new file mode 100644 index 0000000..28e0e8e --- /dev/null +++ b/tests/test_map_upload.py @@ -0,0 +1,659 @@ +"""Unit tests for the MapUploadModule fanout module.""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.fanout.map_upload import ( + MapUploadModule, + _DEFAULT_API_URL, + _REUPLOAD_SECONDS, + _get_radio_params, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_module(config: dict | None = None) -> MapUploadModule: + cfg = {"dry_run": True, "api_url": ""} + if config: + cfg.update(config) + return MapUploadModule("test-id", cfg, name="Test Map Upload") + + +def _advert_raw_data(payload_type: str = "ADVERT", raw_hex: str = "aabbccdd") -> dict: + return { + "payload_type": payload_type, + "data": raw_hex, + "timestamp": 1000, + "id": 1, + "observation_id": 1, + } + + +def _fake_advert(device_role: int = 2, timestamp: int = 2000, pubkey: str | None = None) -> MagicMock: + advert = MagicMock() + advert.device_role = device_role + advert.timestamp = timestamp + advert.public_key = pubkey or "ab" * 32 + return advert + + +# --------------------------------------------------------------------------- +# Module lifecycle +# --------------------------------------------------------------------------- + + +class TestMapUploadLifecycle: + @pytest.mark.asyncio + async def test_start_creates_client(self): + mod = _make_module() + await mod.start() + assert mod._client is not None + assert mod.status == "connected" + await mod.stop() + + @pytest.mark.asyncio + async def test_stop_clears_client(self): + mod = _make_module() + await mod.start() + await mod.stop() + assert mod._client is None + assert mod.status == "disconnected" + + @pytest.mark.asyncio + async def test_start_clears_seen_table(self): + mod = _make_module() + mod._seen["somepubkey"] = 999 + await mod.start() + assert mod._seen == {} + await mod.stop() + + def test_status_error_when_last_error_set(self): + mod = _make_module() + mod._client = MagicMock() + mod._last_error = "HTTP 500" + assert mod.status == "error" + + +# --------------------------------------------------------------------------- +# on_raw filtering +# --------------------------------------------------------------------------- + + +class TestOnRawFiltering: + @pytest.mark.asyncio + async def test_non_advert_packet_ignored(self): + mod = _make_module() + await mod.start() + + with patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload: + await mod.on_raw(_advert_raw_data(payload_type="GROUP_TEXT")) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_empty_data_ignored(self): + mod = _make_module() + await mod.start() + + with patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload: + await mod.on_raw({"payload_type": "ADVERT", "data": ""}) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_invalid_hex_ignored(self): + mod = _make_module() + await mod.start() + + with patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload: + await mod.on_raw({"payload_type": "ADVERT", "data": "ZZZZ"}) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_parse_failure_ignored(self): + mod = _make_module() + await mod.start() + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=None), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_advert_parse_failure_ignored(self): + mod = _make_module() + await mod.start() + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 10 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch("app.fanout.map_upload.parse_advertisement", return_value=None), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_chat_advert_skipped(self): + """device_role == 1 (CHAT) must be skipped.""" + mod = _make_module() + await mod.start() + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=1), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_repeater_advert_processed(self): + """device_role == 2 (Repeater) must be uploaded.""" + mod = _make_module() + await mod.start() + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=2), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_called_once() + + await mod.stop() + + @pytest.mark.asyncio + async def test_room_advert_processed(self): + """device_role == 3 (Room) must be uploaded.""" + mod = _make_module() + await mod.start() + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=3), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_called_once() + + await mod.stop() + + +# --------------------------------------------------------------------------- +# Rate limiting +# --------------------------------------------------------------------------- + + +class TestRateLimiting: + @pytest.mark.asyncio + async def test_first_seen_pubkey_passes(self): + mod = _make_module() + await mod.start() + + pubkey = "ab" * 32 + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=2, timestamp=5000, pubkey=pubkey), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_called_once() + + await mod.stop() + + @pytest.mark.asyncio + async def test_replay_skipped(self): + """Same or older timestamp should be skipped.""" + mod = _make_module() + await mod.start() + + pubkey = "ab" * 32 + mod._seen[pubkey] = 5000 # already uploaded at ts=5000 + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=2, timestamp=5000, pubkey=pubkey), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_within_rate_limit_window_skipped(self): + """Newer timestamp but within 1-hr window should be skipped.""" + mod = _make_module() + await mod.start() + + pubkey = "ab" * 32 + last_ts = 5000 + mod._seen[pubkey] = last_ts + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + # 30 minutes later — still within the 1-hour window + new_ts = last_ts + (_REUPLOAD_SECONDS // 2) + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=2, timestamp=new_ts, pubkey=pubkey), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_after_rate_limit_window_passes(self): + """Timestamp beyond the 1-hr window should be uploaded again.""" + mod = _make_module() + await mod.start() + + pubkey = "ab" * 32 + last_ts = 5000 + mod._seen[pubkey] = last_ts + + mock_packet = MagicMock() + mock_packet.payload = b"\x00" * 101 + + new_ts = last_ts + _REUPLOAD_SECONDS + 1 + + with ( + patch("app.fanout.map_upload.parse_packet", return_value=mock_packet), + patch( + "app.fanout.map_upload.parse_advertisement", + return_value=_fake_advert(device_role=2, timestamp=new_ts, pubkey=pubkey), + ), + patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload, + ): + await mod.on_raw(_advert_raw_data()) + mock_upload.assert_called_once() + + await mod.stop() + + +# --------------------------------------------------------------------------- +# Dry run behaviour +# --------------------------------------------------------------------------- + + +class TestDryRun: + @pytest.mark.asyncio + async def test_dry_run_logs_but_does_not_post(self): + """dry_run=True must log the payload but never call httpx.""" + mod = _make_module({"dry_run": True}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 915, "cr": 5, "sf": 10, "bw": 125}), + ): + assert mod._client is not None + post_mock = AsyncMock() + mod._client.post = post_mock # type: ignore[method-assign] + + await mod._upload("ab" * 32, 1000, 2, "aabbccdd") + + post_mock.assert_not_called() + + await mod.stop() + + @pytest.mark.asyncio + async def test_dry_run_updates_seen_table(self): + """dry_run still records the pubkey so rate-limiting works.""" + mod = _make_module({"dry_run": True}) + await mod.start() + + pubkey = "ab" * 32 + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}), + ): + await mod._upload(pubkey, 9999, 2, "aabb") + assert mod._seen[pubkey] == 9999 + + await mod.stop() + + @pytest.mark.asyncio + async def test_dry_run_no_key_logs_warning_and_returns(self): + """If private key is missing, upload should log a warning and not crash.""" + mod = _make_module({"dry_run": True}) + await mod.start() + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=None), + patch("app.fanout.map_upload.get_public_key", return_value=None), + ): + # Should not raise + await mod._upload("ab" * 32, 1000, 2, "aabb") + assert mod._seen == {} + + await mod.stop() + + +# --------------------------------------------------------------------------- +# Live send behaviour +# --------------------------------------------------------------------------- + + +class TestLiveSend: + @pytest.mark.asyncio + async def test_live_send_posts_to_api_url(self): + """dry_run=False should POST to the configured api_url.""" + custom_url = "https://custom.example.com/api/upload" + mod = _make_module({"dry_run": False, "api_url": custom_url}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 915, "cr": 5, "sf": 10, "bw": 125}), + ): + assert mod._client is not None + post_mock = AsyncMock(return_value=mock_response) + mod._client.post = post_mock # type: ignore[method-assign] + + await mod._upload("ab" * 32, 1000, 2, "aabbccdd") + + post_mock.assert_called_once() + call_url = post_mock.call_args[0][0] + assert call_url == custom_url + + await mod.stop() + + @pytest.mark.asyncio + async def test_live_send_defaults_to_map_url(self): + """Empty api_url should default to the map.meshcore.dev endpoint.""" + mod = _make_module({"dry_run": False, "api_url": ""}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}), + ): + assert mod._client is not None + post_mock = AsyncMock(return_value=mock_response) + mod._client.post = post_mock # type: ignore[method-assign] + + await mod._upload("ab" * 32, 1000, 2, "aabb") + + call_url = post_mock.call_args[0][0] + assert call_url == _DEFAULT_API_URL + + await mod.stop() + + @pytest.mark.asyncio + async def test_live_send_updates_seen_on_success(self): + mod = _make_module({"dry_run": False}) + await mod.start() + + pubkey = "cd" * 32 + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}), + ): + assert mod._client is not None + mod._client.post = AsyncMock(return_value=mock_response) # type: ignore[method-assign] + await mod._upload(pubkey, 7777, 2, "aabb") + assert mod._seen[pubkey] == 7777 + + await mod.stop() + + @pytest.mark.asyncio + async def test_live_send_http_error_sets_last_error(self): + import httpx + + mod = _make_module({"dry_run": False}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + error_response = MagicMock() + error_response.status_code = 500 + error_response.text = "Internal Server Error" + exc = httpx.HTTPStatusError("500", request=MagicMock(), response=error_response) + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}), + ): + assert mod._client is not None + mod._client.post = AsyncMock(side_effect=exc) # type: ignore[method-assign] + await mod._upload("ab" * 32, 1000, 2, "aabb") + assert mod._last_error == "HTTP 500" + assert mod.status == "error" + + await mod.stop() + + @pytest.mark.asyncio + async def test_live_send_request_error_sets_last_error(self): + import httpx + + mod = _make_module({"dry_run": False}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}), + ): + assert mod._client is not None + mod._client.post = AsyncMock(side_effect=httpx.ConnectError("conn refused")) # type: ignore[method-assign] + await mod._upload("ab" * 32, 1000, 2, "aabb") + assert mod._last_error is not None + assert mod.status == "error" + + await mod.stop() + + +# --------------------------------------------------------------------------- +# Payload structure +# --------------------------------------------------------------------------- + + +class TestPayloadStructure: + @pytest.mark.asyncio + async def test_request_payload_has_required_fields(self): + """The POST body must contain data, signature, and publicKey.""" + mod = _make_module({"dry_run": False}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + captured: list[dict] = [] + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + + async def capture_post(url, *, content, headers): + captured.append(json.loads(content)) + return mock_response + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 915, "cr": 5, "sf": 10, "bw": 125}), + ): + assert mod._client is not None + mod._client.post = capture_post # type: ignore[method-assign] + await mod._upload("ab" * 32, 1000, 2, "aabbccdd") + + assert len(captured) == 1 + payload = captured[0] + assert "data" in payload + assert "signature" in payload + assert "publicKey" in payload + + # data field should be parseable JSON with params and links + inner = json.loads(payload["data"]) + assert "params" in inner + assert "links" in inner + assert len(inner["links"]) == 1 + assert inner["links"][0] == "meshcore://aabbccdd" + + # links reference the raw hex as-is + assert inner["params"]["freq"] == 915 + assert inner["params"]["sf"] == 10 + + await mod.stop() + + @pytest.mark.asyncio + async def test_public_key_hex_in_payload(self): + mod = _make_module({"dry_run": False}) + await mod.start() + + fake_private = bytes(range(64)) + fake_public = bytes(range(32)) + captured: list[dict] = [] + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + + async def capture_post(url, *, content, headers): + captured.append(json.loads(content)) + return mock_response + + with ( + patch("app.fanout.map_upload.get_private_key", return_value=fake_private), + patch("app.fanout.map_upload.get_public_key", return_value=fake_public), + patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}), + ): + assert mod._client is not None + mod._client.post = capture_post # type: ignore[method-assign] + await mod._upload("ab" * 32, 1000, 2, "ff") + + assert captured[0]["publicKey"] == fake_public.hex() + + await mod.stop() + + +# --------------------------------------------------------------------------- +# _get_radio_params +# --------------------------------------------------------------------------- + + +class TestGetRadioParams: + def test_returns_zeros_when_radio_not_connected(self): + with patch("app.fanout.map_upload.radio_runtime") as mock_rt: + mock_rt.meshcore = None + params = _get_radio_params() + assert params == {"freq": 0, "cr": 0, "sf": 0, "bw": 0} + + def test_returns_zeros_on_exception(self): + with patch("app.fanout.map_upload.radio_runtime", side_effect=Exception("boom")): + params = _get_radio_params() + assert params == {"freq": 0, "cr": 0, "sf": 0, "bw": 0} + + def test_divides_freq_and_bw_by_1000(self): + mock_rt = MagicMock() + mock_rt.meshcore.self_info = { + "radio_freq": 915000, + "radio_bw": 125000, + "radio_sf": 10, + "radio_cr": 5, + } + with patch("app.fanout.map_upload.radio_runtime", mock_rt): + params = _get_radio_params() + assert params["freq"] == 915.0 + assert params["bw"] == 125.0 + assert params["sf"] == 10 + assert params["cr"] == 5 + +