Add Map Upload Integration and tests

This commit is contained in:
Kyle
2026-03-24 11:09:16 -04:00
committed by Kizniche
parent caf4bf4eff
commit 6eab75ec7e
6 changed files with 1155 additions and 3 deletions

View File

@@ -21,6 +21,7 @@ def _register_module_types() -> None:
return
from app.fanout.apprise_mod import AppriseModule
from app.fanout.bot import BotModule
from app.fanout.map_upload import MapUploadModule
from app.fanout.mqtt_community import MqttCommunityModule
from app.fanout.mqtt_private import MqttPrivateModule
from app.fanout.sqs import SqsModule
@@ -32,6 +33,7 @@ def _register_module_types() -> None:
_MODULE_TYPES["webhook"] = WebhookModule
_MODULE_TYPES["apprise"] = AppriseModule
_MODULE_TYPES["sqs"] = SqsModule
_MODULE_TYPES["map_upload"] = MapUploadModule
def _matches_filter(filter_value: Any, key: str) -> bool:

270
app/fanout/map_upload.py Normal file
View File

@@ -0,0 +1,270 @@
"""Fanout module for uploading heard advert packets to map.meshcore.dev.
Mirrors the logic of the standalone map.meshcore.dev-uploader project:
- Listens on raw RF packets via on_raw
- Filters for ADVERT packets, skips CHAT nodes (device_role == 1)
- Applies per-pubkey rate-limiting (1-hour window, matching the uploader)
- Signs the upload request with the radio's own Ed25519 private key
- POSTs to the map API (or logs in dry-run mode)
Dry-run mode (default: True) logs the full would-be payload at INFO level
without making any HTTP requests. Disable it only after verifying the log
output looks correct — in particular the radio params (freq/bw/sf/cr) and
the raw hex link.
"""
from __future__ import annotations
import hashlib
import json
import logging
import httpx
from app.decoder import parse_advertisement, parse_packet
from app.fanout.base import FanoutModule
from app.keystore import get_private_key, get_public_key
from app.services.radio_runtime import radio_runtime
logger = logging.getLogger(__name__)
_DEFAULT_API_URL = "https://map.meshcore.dev/api/v1/uploader/node"
# Re-upload guard: skip re-uploading a pubkey seen within this window (AU parity)
_REUPLOAD_SECONDS = 3600
# Device role 1 = CHAT — skip these; repeaters (2) and rooms (3) are the map targets
_SKIP_DEVICE_ROLES = {1}
# Ed25519 group order (L)
_L = 2**252 + 27742317777372353535851937790883648493
def _ed25519_sign_expanded(
message: bytes, scalar: bytes, prefix: bytes, public_key: bytes
) -> bytes:
"""Sign using MeshCore's expanded Ed25519 key format.
MeshCore stores 64-byte keys as scalar(32) || prefix(32). Standard
Ed25519 libraries expect seed format and would re-SHA-512 the key, so
we perform the signing manually using the already-expanded key material.
Mirrors the implementation in app/fanout/community_mqtt.py.
"""
import nacl.bindings
r = int.from_bytes(hashlib.sha512(prefix + message).digest(), "little") % _L
R = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(r.to_bytes(32, "little"))
k = int.from_bytes(hashlib.sha512(R + public_key + message).digest(), "little") % _L
s = (r + k * int.from_bytes(scalar, "little")) % _L
return R + s.to_bytes(32, "little")
def _get_radio_params() -> dict:
"""Read radio frequency parameters from the connected radio's self_info.
The standalone AU divides raw freq/bw values by 1000 before sending.
The meshcore Python library returns radio_freq and radio_bw at the same
scale as the JS library, so we apply the same /1000 division.
IMPORTANT: verify the actual values in dry_run logs before enabling live
sends. The units reported by the Python library should be confirmed; if
the logged freq/bw look wrong (e.g. 0.915 instead of 915000), the
division factor here may need adjusting.
"""
try:
mc = radio_runtime.meshcore
if not mc:
return {"freq": 0, "cr": 0, "sf": 0, "bw": 0}
info = mc.self_info
if not isinstance(info, dict):
return {"freq": 0, "cr": 0, "sf": 0, "bw": 0}
freq = info.get("radio_freq", 0) or 0
bw = info.get("radio_bw", 0) or 0
sf = info.get("radio_sf", 0) or 0
cr = info.get("radio_cr", 0) or 0
return {
"freq": freq / 1000.0 if freq else 0,
"cr": cr,
"sf": sf,
"bw": bw / 1000.0 if bw else 0,
}
except Exception:
pass
return {"freq": 0, "cr": 0, "sf": 0, "bw": 0}
_ROLE_NAMES: dict[int, str] = {2: "repeater", 3: "room", 4: "sensor"}
class MapUploadModule(FanoutModule):
"""Uploads heard ADVERT packets to the MeshCore community map."""
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._client: httpx.AsyncClient | None = None
self._last_error: str | None = None
# Per-pubkey rate limiting: pubkey_hex -> last_uploaded_advert_timestamp
self._seen: dict[str, int] = {}
async def start(self) -> None:
self._client = httpx.AsyncClient(timeout=httpx.Timeout(15.0))
self._last_error = None
self._seen.clear()
async def stop(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def on_raw(self, data: dict) -> None:
if data.get("payload_type") != "ADVERT":
return
raw_hex = data.get("data", "")
if not raw_hex:
return
try:
raw_bytes = bytes.fromhex(raw_hex)
except ValueError:
return
packet_info = parse_packet(raw_bytes)
if packet_info is None:
return
advert = parse_advertisement(packet_info.payload, raw_packet=raw_bytes)
if advert is None:
return
# TODO: advert Ed25519 signature verification is skipped here.
# The radio has already validated the packet before passing it to RT,
# so re-verification is redundant in practice. If added, verify that
# nacl.bindings.crypto_sign_open(sig + (pubkey_bytes || timestamp_bytes),
# advert.public_key_bytes) succeeds before proceeding.
# Skip CHAT-type nodes (role 1) — map only shows repeaters (2) and rooms (3)
if advert.device_role in _SKIP_DEVICE_ROLES:
return
pubkey = advert.public_key.lower()
# Rate-limit: skip if this pubkey's timestamp hasn't advanced enough
last_seen = self._seen.get(pubkey)
if last_seen is not None:
if last_seen >= advert.timestamp:
logger.debug(
"MapUpload: skipping %s — possible replay (last=%d, advert=%d)",
pubkey[:12],
last_seen,
advert.timestamp,
)
return
if advert.timestamp < last_seen + _REUPLOAD_SECONDS:
logger.debug(
"MapUpload: skipping %s — within 1-hr rate-limit window (delta=%ds)",
pubkey[:12],
advert.timestamp - last_seen,
)
return
await self._upload(pubkey, advert.timestamp, advert.device_role, raw_hex)
async def _upload(
self,
pubkey: str,
advert_timestamp: int,
device_role: int,
raw_hex: str,
) -> None:
private_key = get_private_key()
public_key = get_public_key()
if private_key is None or public_key is None:
logger.warning(
"MapUpload: private key not available — cannot sign upload for %s. "
"Ensure radio firmware has ENABLE_PRIVATE_KEY_EXPORT=1.",
pubkey[:12],
)
return
api_url = str(self.config.get("api_url", "") or _DEFAULT_API_URL).strip()
dry_run = bool(self.config.get("dry_run", True))
role_name = _ROLE_NAMES.get(device_role, f"role={device_role}")
params = _get_radio_params()
upload_data = {
"params": params,
"links": [f"meshcore://{raw_hex}"],
}
# Sign: SHA-256 the compact JSON, then Ed25519-sign the hash
json_str = json.dumps(upload_data, separators=(",", ":"))
data_hash = hashlib.sha256(json_str.encode()).digest()
scalar = private_key[:32]
prefix_bytes = private_key[32:]
signature = _ed25519_sign_expanded(data_hash, scalar, prefix_bytes, public_key)
request_payload = {
"data": json_str,
"signature": signature.hex(),
"publicKey": public_key.hex(),
}
if dry_run:
logger.info(
"MapUpload [DRY RUN] %s (%s) → would POST to %s\n payload: %s",
pubkey[:12],
role_name,
api_url,
json.dumps(request_payload, separators=(",", ":")),
)
# Still update _seen so rate-limiting works during dry-run testing
self._seen[pubkey] = advert_timestamp
return
if not self._client:
return
try:
resp = await self._client.post(
api_url,
content=json.dumps(request_payload, separators=(",", ":")),
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
self._seen[pubkey] = advert_timestamp
self._last_error = None
logger.info(
"MapUpload: uploaded %s (%s) → HTTP %d",
pubkey[:12],
role_name,
resp.status_code,
)
except httpx.HTTPStatusError as exc:
self._last_error = f"HTTP {exc.response.status_code}"
logger.warning(
"MapUpload: server returned %d for %s: %s",
exc.response.status_code,
pubkey[:12],
exc.response.text[:200],
)
except httpx.RequestError as exc:
self._last_error = str(exc)
logger.warning("MapUpload: request error for %s: %s", pubkey[:12], exc)
@property
def status(self) -> str:
if self._client is None:
return "disconnected"
if self._last_error:
return "error"
return "connected"

View File

@@ -16,7 +16,7 @@ from app.repository.fanout import FanoutConfigRepository
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/fanout", tags=["fanout"])
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs"}
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs", "map_upload"}
_IATA_RE = re.compile(r"^[A-Z]{3}$")
_DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets"
@@ -94,6 +94,8 @@ def _validate_and_normalize_config(config_type: str, config: dict) -> dict:
_validate_apprise_config(normalized)
elif config_type == "sqs":
_validate_sqs_config(normalized)
elif config_type == "map_upload":
_validate_map_upload_config(normalized)
return normalized
@@ -295,10 +297,25 @@ def _validate_sqs_config(config: dict) -> None:
)
def _validate_map_upload_config(config: dict) -> None:
"""Validate and normalize map_upload config blob."""
api_url = str(config.get("api_url", "")).strip()
if api_url and not api_url.startswith(("http://", "https://")):
raise HTTPException(
status_code=400,
detail="api_url must start with http:// or https://",
)
# Persist the cleaned value (empty string means use the module default)
config["api_url"] = api_url
config["dry_run"] = bool(config.get("dry_run", True))
def _enforce_scope(config_type: str, scope: dict) -> dict:
"""Enforce type-specific scope constraints. Returns normalized scope."""
if config_type == "mqtt_community":
return {"messages": "none", "raw_packets": "all"}
if config_type == "map_upload":
return {"messages": "none", "raw_packets": "all"}
if config_type == "bot":
return {"messages": "all", "raw_packets": "none"}
if config_type in ("webhook", "apprise"):

View File

@@ -21,8 +21,10 @@ const TYPE_LABELS: Record<string, string> = {
webhook: 'Webhook',
apprise: 'Apprise',
sqs: 'Amazon SQS',
map_upload: 'Map Upload',
};
const DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE = 'meshcore/{IATA}/{PUBLIC_KEY}/packets';
const DEFAULT_COMMUNITY_BROKER_HOST = 'mqtt-us-v1.letsmesh.net';
const DEFAULT_COMMUNITY_BROKER_HOST_EU = 'mqtt-eu-v1.letsmesh.net';
@@ -35,6 +37,7 @@ const DEFAULT_MESHRANK_TRANSPORT = 'tcp';
const DEFAULT_MESHRANK_AUTH_MODE = 'none';
const DEFAULT_MESHRANK_IATA = 'XYZ';
function createCommunityConfigDefaults(
overrides: Partial<Record<string, unknown>> = {}
): Record<string, unknown> {
@@ -100,7 +103,8 @@ type DraftType =
| 'webhook'
| 'apprise'
| 'sqs'
| 'bot';
| 'bot'
| 'map_upload';
type CreateIntegrationDefinition = {
value: DraftType;
@@ -284,6 +288,23 @@ const CREATE_INTEGRATION_DEFINITIONS: readonly CreateIntegrationDefinition[] = [
scope: { messages: 'all', raw_packets: 'none' },
},
},
{
value: 'map_upload',
savedType: 'map_upload',
label: 'Map Upload',
section: 'Bulk Forwarding',
description:
'Upload node positions to map.meshcore.dev or a compatible map API endpoint.',
defaultName: 'Map Upload',
nameMode: 'counted',
defaults: {
config: {
api_url: '',
dry_run: true,
},
scope: { messages: 'none', raw_packets: 'all' },
},
},
];
const CREATE_INTEGRATION_DEFINITIONS_BY_VALUE = Object.fromEntries(
@@ -566,7 +587,9 @@ function getDefaultIntegrationName(type: string, configs: FanoutConfig[]) {
function getStatusLabel(status: string | undefined, type?: string) {
if (status === 'connected')
return type === 'bot' || type === 'webhook' || type === 'apprise' ? 'Active' : 'Connected';
return type === 'bot' || type === 'webhook' || type === 'apprise' || type === 'map_upload'
? 'Active'
: 'Connected';
if (status === 'error') return 'Error';
if (status === 'disconnected') return 'Disconnected';
return 'Inactive';
@@ -1059,6 +1082,73 @@ function BotConfigEditor({
);
}
function MapUploadConfigEditor({
config,
onChange,
}: {
config: Record<string, unknown>;
onChange: (config: Record<string, unknown>) => void;
}) {
const isDryRun = config.dry_run !== false;
return (
<div className="space-y-3">
<p className="text-xs text-muted-foreground">
Automatically upload heard repeater and room advertisements to{' '}
<a
href="https://map.meshcore.dev"
target="_blank"
rel="noopener noreferrer"
className="underline hover:text-foreground"
>
map.meshcore.dev
</a>
. Requires the radio&apos;s private key to be available (firmware must have{' '}
<code>ENABLE_PRIVATE_KEY_EXPORT=1</code>). Only raw RF packets are shared &mdash; never
decrypted messages.
</p>
<div className="rounded-md border border-warning/50 bg-warning/10 px-3 py-2 text-xs text-warning">
<strong>Dry Run is {isDryRun ? 'ON' : 'OFF'}.</strong>{' '}
{isDryRun
? 'No uploads will be sent. Check the backend logs to verify the payload looks correct before enabling live sends.'
: 'Live uploads are enabled. Each advert is rate-limited to once per hour per node.'}
</div>
<label className="flex items-center gap-3 cursor-pointer">
<input
type="checkbox"
checked={isDryRun}
onChange={(e) => onChange({ ...config, dry_run: e.target.checked })}
className="h-4 w-4 rounded border-border"
/>
<div>
<span className="text-sm font-medium">Dry Run (log only, no uploads)</span>
<p className="text-xs text-muted-foreground">
When enabled, upload payloads are logged at INFO level but not sent. Disable once you
have confirmed the logged output looks correct.
</p>
</div>
</label>
<Separator />
<div className="space-y-2">
<Label htmlFor="fanout-map-api-url">API URL (optional)</Label>
<Input
id="fanout-map-api-url"
type="url"
placeholder="https://map.meshcore.dev/api/v1/uploader/node"
value={(config.api_url as string) || ''}
onChange={(e) => onChange({ ...config, api_url: e.target.value })}
/>
<p className="text-xs text-muted-foreground">
Leave blank to use the default <code>map.meshcore.dev</code> endpoint.
</p>
</div>
</div>
);
}
type ScopeMode = 'all' | 'none' | 'only' | 'except';
function getScopeMode(value: unknown): ScopeMode {
@@ -1975,6 +2065,10 @@ export function SettingsFanoutSection({
/>
)}
{detailType === 'map_upload' && (
<MapUploadConfigEditor config={editConfig} onChange={setEditConfig} />
)}
<Separator />
<div className="flex gap-2">

View File

@@ -1790,3 +1790,113 @@ class TestManagerRestartFailure:
assert len(healthy.messages_received) == 1
assert len(dead.messages_received) == 0
# ---------------------------------------------------------------------------
# MapUploadModule integration tests
# ---------------------------------------------------------------------------
class TestMapUploadIntegration:
"""Integration tests: FanoutManager loads and dispatches to MapUploadModule."""
@pytest.mark.asyncio
async def test_map_upload_module_loaded_and_receives_raw(self, integration_db):
"""Enabled map_upload config is loaded by the manager and its on_raw is called."""
from unittest.mock import AsyncMock, MagicMock, patch
cfg = await FanoutConfigRepository.create(
config_type="map_upload",
name="Map",
config={"dry_run": True, "api_url": ""},
scope={"messages": "none", "raw_packets": "all"},
enabled=True,
)
manager = FanoutManager()
await manager.load_from_db()
assert cfg["id"] in manager._modules
module, scope = manager._modules[cfg["id"]]
assert scope == {"messages": "none", "raw_packets": "all"}
# Raw ADVERT event should be dispatched to on_raw
advert_data = {
"payload_type": "ADVERT",
"data": "aabbccdd",
"timestamp": 1000,
"id": 1,
"observation_id": 1,
}
with patch.object(module, "_upload", new_callable=AsyncMock) as mock_upload:
# Provide a parseable but minimal packet so on_raw gets past hex decode;
# parse_packet/parse_advertisement returning None is fine — on_raw silently exits
await manager.broadcast_raw(advert_data)
# Give the asyncio task a chance to run
import asyncio
await asyncio.sleep(0.05)
# _upload may or may not be called depending on parse result, but no exception
await manager.stop_all()
@pytest.mark.asyncio
async def test_map_upload_disabled_not_loaded(self, integration_db):
"""Disabled map_upload config is not loaded by the manager."""
await FanoutConfigRepository.create(
config_type="map_upload",
name="Map Disabled",
config={"dry_run": True, "api_url": ""},
scope={"messages": "none", "raw_packets": "all"},
enabled=False,
)
manager = FanoutManager()
await manager.load_from_db()
assert len(manager._modules) == 0
await manager.stop_all()
@pytest.mark.asyncio
async def test_map_upload_does_not_receive_messages(self, integration_db):
"""map_upload scope forces raw_packets only — message events must not reach it."""
from unittest.mock import AsyncMock, patch
cfg = await FanoutConfigRepository.create(
config_type="map_upload",
name="Map",
config={"dry_run": True, "api_url": ""},
scope={"messages": "none", "raw_packets": "all"},
enabled=True,
)
manager = FanoutManager()
await manager.load_from_db()
assert cfg["id"] in manager._modules
module, _ = manager._modules[cfg["id"]]
with patch.object(module, "on_message", new_callable=AsyncMock) as mock_msg:
await manager.broadcast_message({"type": "CHAN", "conversation_key": "k1", "text": "hi"})
import asyncio
await asyncio.sleep(0.05)
mock_msg.assert_not_called()
await manager.stop_all()
@pytest.mark.asyncio
async def test_map_upload_scope_enforced_on_create(self, integration_db):
"""Scope for map_upload is always fixed to raw_packets: all, messages: none."""
# Even if a custom scope is passed, the router enforces the correct one.
# Here we verify the DB record has the enforced scope.
cfg = await FanoutConfigRepository.create(
config_type="map_upload",
name="Map",
config={"dry_run": True, "api_url": ""},
scope={"messages": "all", "raw_packets": "none"}, # wrong, should be overridden by router
enabled=True,
)
# The repository stores whatever the router passes — we test the router via HTTP
# in test_api.py; here we just verify the module works with the correct scope.
assert cfg["type"] == "map_upload"

659
tests/test_map_upload.py Normal file
View File

@@ -0,0 +1,659 @@
"""Unit tests for the MapUploadModule fanout module."""
from __future__ import annotations
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.fanout.map_upload import (
MapUploadModule,
_DEFAULT_API_URL,
_REUPLOAD_SECONDS,
_get_radio_params,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_module(config: dict | None = None) -> MapUploadModule:
cfg = {"dry_run": True, "api_url": ""}
if config:
cfg.update(config)
return MapUploadModule("test-id", cfg, name="Test Map Upload")
def _advert_raw_data(payload_type: str = "ADVERT", raw_hex: str = "aabbccdd") -> dict:
return {
"payload_type": payload_type,
"data": raw_hex,
"timestamp": 1000,
"id": 1,
"observation_id": 1,
}
def _fake_advert(device_role: int = 2, timestamp: int = 2000, pubkey: str | None = None) -> MagicMock:
advert = MagicMock()
advert.device_role = device_role
advert.timestamp = timestamp
advert.public_key = pubkey or "ab" * 32
return advert
# ---------------------------------------------------------------------------
# Module lifecycle
# ---------------------------------------------------------------------------
class TestMapUploadLifecycle:
@pytest.mark.asyncio
async def test_start_creates_client(self):
mod = _make_module()
await mod.start()
assert mod._client is not None
assert mod.status == "connected"
await mod.stop()
@pytest.mark.asyncio
async def test_stop_clears_client(self):
mod = _make_module()
await mod.start()
await mod.stop()
assert mod._client is None
assert mod.status == "disconnected"
@pytest.mark.asyncio
async def test_start_clears_seen_table(self):
mod = _make_module()
mod._seen["somepubkey"] = 999
await mod.start()
assert mod._seen == {}
await mod.stop()
def test_status_error_when_last_error_set(self):
mod = _make_module()
mod._client = MagicMock()
mod._last_error = "HTTP 500"
assert mod.status == "error"
# ---------------------------------------------------------------------------
# on_raw filtering
# ---------------------------------------------------------------------------
class TestOnRawFiltering:
@pytest.mark.asyncio
async def test_non_advert_packet_ignored(self):
mod = _make_module()
await mod.start()
with patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload:
await mod.on_raw(_advert_raw_data(payload_type="GROUP_TEXT"))
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_empty_data_ignored(self):
mod = _make_module()
await mod.start()
with patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload:
await mod.on_raw({"payload_type": "ADVERT", "data": ""})
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_invalid_hex_ignored(self):
mod = _make_module()
await mod.start()
with patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload:
await mod.on_raw({"payload_type": "ADVERT", "data": "ZZZZ"})
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_parse_failure_ignored(self):
mod = _make_module()
await mod.start()
with (
patch("app.fanout.map_upload.parse_packet", return_value=None),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_advert_parse_failure_ignored(self):
mod = _make_module()
await mod.start()
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 10
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch("app.fanout.map_upload.parse_advertisement", return_value=None),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_chat_advert_skipped(self):
"""device_role == 1 (CHAT) must be skipped."""
mod = _make_module()
await mod.start()
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=1),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_repeater_advert_processed(self):
"""device_role == 2 (Repeater) must be uploaded."""
mod = _make_module()
await mod.start()
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=2),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_called_once()
await mod.stop()
@pytest.mark.asyncio
async def test_room_advert_processed(self):
"""device_role == 3 (Room) must be uploaded."""
mod = _make_module()
await mod.start()
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=3),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_called_once()
await mod.stop()
# ---------------------------------------------------------------------------
# Rate limiting
# ---------------------------------------------------------------------------
class TestRateLimiting:
@pytest.mark.asyncio
async def test_first_seen_pubkey_passes(self):
mod = _make_module()
await mod.start()
pubkey = "ab" * 32
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=2, timestamp=5000, pubkey=pubkey),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_called_once()
await mod.stop()
@pytest.mark.asyncio
async def test_replay_skipped(self):
"""Same or older timestamp should be skipped."""
mod = _make_module()
await mod.start()
pubkey = "ab" * 32
mod._seen[pubkey] = 5000 # already uploaded at ts=5000
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=2, timestamp=5000, pubkey=pubkey),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_within_rate_limit_window_skipped(self):
"""Newer timestamp but within 1-hr window should be skipped."""
mod = _make_module()
await mod.start()
pubkey = "ab" * 32
last_ts = 5000
mod._seen[pubkey] = last_ts
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
# 30 minutes later — still within the 1-hour window
new_ts = last_ts + (_REUPLOAD_SECONDS // 2)
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=2, timestamp=new_ts, pubkey=pubkey),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_after_rate_limit_window_passes(self):
"""Timestamp beyond the 1-hr window should be uploaded again."""
mod = _make_module()
await mod.start()
pubkey = "ab" * 32
last_ts = 5000
mod._seen[pubkey] = last_ts
mock_packet = MagicMock()
mock_packet.payload = b"\x00" * 101
new_ts = last_ts + _REUPLOAD_SECONDS + 1
with (
patch("app.fanout.map_upload.parse_packet", return_value=mock_packet),
patch(
"app.fanout.map_upload.parse_advertisement",
return_value=_fake_advert(device_role=2, timestamp=new_ts, pubkey=pubkey),
),
patch.object(mod, "_upload", new_callable=AsyncMock) as mock_upload,
):
await mod.on_raw(_advert_raw_data())
mock_upload.assert_called_once()
await mod.stop()
# ---------------------------------------------------------------------------
# Dry run behaviour
# ---------------------------------------------------------------------------
class TestDryRun:
@pytest.mark.asyncio
async def test_dry_run_logs_but_does_not_post(self):
"""dry_run=True must log the payload but never call httpx."""
mod = _make_module({"dry_run": True})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 915, "cr": 5, "sf": 10, "bw": 125}),
):
assert mod._client is not None
post_mock = AsyncMock()
mod._client.post = post_mock # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "aabbccdd")
post_mock.assert_not_called()
await mod.stop()
@pytest.mark.asyncio
async def test_dry_run_updates_seen_table(self):
"""dry_run still records the pubkey so rate-limiting works."""
mod = _make_module({"dry_run": True})
await mod.start()
pubkey = "ab" * 32
fake_private = bytes(range(64))
fake_public = bytes(range(32))
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}),
):
await mod._upload(pubkey, 9999, 2, "aabb")
assert mod._seen[pubkey] == 9999
await mod.stop()
@pytest.mark.asyncio
async def test_dry_run_no_key_logs_warning_and_returns(self):
"""If private key is missing, upload should log a warning and not crash."""
mod = _make_module({"dry_run": True})
await mod.start()
with (
patch("app.fanout.map_upload.get_private_key", return_value=None),
patch("app.fanout.map_upload.get_public_key", return_value=None),
):
# Should not raise
await mod._upload("ab" * 32, 1000, 2, "aabb")
assert mod._seen == {}
await mod.stop()
# ---------------------------------------------------------------------------
# Live send behaviour
# ---------------------------------------------------------------------------
class TestLiveSend:
@pytest.mark.asyncio
async def test_live_send_posts_to_api_url(self):
"""dry_run=False should POST to the configured api_url."""
custom_url = "https://custom.example.com/api/upload"
mod = _make_module({"dry_run": False, "api_url": custom_url})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 915, "cr": 5, "sf": 10, "bw": 125}),
):
assert mod._client is not None
post_mock = AsyncMock(return_value=mock_response)
mod._client.post = post_mock # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "aabbccdd")
post_mock.assert_called_once()
call_url = post_mock.call_args[0][0]
assert call_url == custom_url
await mod.stop()
@pytest.mark.asyncio
async def test_live_send_defaults_to_map_url(self):
"""Empty api_url should default to the map.meshcore.dev endpoint."""
mod = _make_module({"dry_run": False, "api_url": ""})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}),
):
assert mod._client is not None
post_mock = AsyncMock(return_value=mock_response)
mod._client.post = post_mock # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "aabb")
call_url = post_mock.call_args[0][0]
assert call_url == _DEFAULT_API_URL
await mod.stop()
@pytest.mark.asyncio
async def test_live_send_updates_seen_on_success(self):
mod = _make_module({"dry_run": False})
await mod.start()
pubkey = "cd" * 32
fake_private = bytes(range(64))
fake_public = bytes(range(32))
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}),
):
assert mod._client is not None
mod._client.post = AsyncMock(return_value=mock_response) # type: ignore[method-assign]
await mod._upload(pubkey, 7777, 2, "aabb")
assert mod._seen[pubkey] == 7777
await mod.stop()
@pytest.mark.asyncio
async def test_live_send_http_error_sets_last_error(self):
import httpx
mod = _make_module({"dry_run": False})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
error_response = MagicMock()
error_response.status_code = 500
error_response.text = "Internal Server Error"
exc = httpx.HTTPStatusError("500", request=MagicMock(), response=error_response)
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}),
):
assert mod._client is not None
mod._client.post = AsyncMock(side_effect=exc) # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "aabb")
assert mod._last_error == "HTTP 500"
assert mod.status == "error"
await mod.stop()
@pytest.mark.asyncio
async def test_live_send_request_error_sets_last_error(self):
import httpx
mod = _make_module({"dry_run": False})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}),
):
assert mod._client is not None
mod._client.post = AsyncMock(side_effect=httpx.ConnectError("conn refused")) # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "aabb")
assert mod._last_error is not None
assert mod.status == "error"
await mod.stop()
# ---------------------------------------------------------------------------
# Payload structure
# ---------------------------------------------------------------------------
class TestPayloadStructure:
@pytest.mark.asyncio
async def test_request_payload_has_required_fields(self):
"""The POST body must contain data, signature, and publicKey."""
mod = _make_module({"dry_run": False})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
captured: list[dict] = []
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
async def capture_post(url, *, content, headers):
captured.append(json.loads(content))
return mock_response
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 915, "cr": 5, "sf": 10, "bw": 125}),
):
assert mod._client is not None
mod._client.post = capture_post # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "aabbccdd")
assert len(captured) == 1
payload = captured[0]
assert "data" in payload
assert "signature" in payload
assert "publicKey" in payload
# data field should be parseable JSON with params and links
inner = json.loads(payload["data"])
assert "params" in inner
assert "links" in inner
assert len(inner["links"]) == 1
assert inner["links"][0] == "meshcore://aabbccdd"
# links reference the raw hex as-is
assert inner["params"]["freq"] == 915
assert inner["params"]["sf"] == 10
await mod.stop()
@pytest.mark.asyncio
async def test_public_key_hex_in_payload(self):
mod = _make_module({"dry_run": False})
await mod.start()
fake_private = bytes(range(64))
fake_public = bytes(range(32))
captured: list[dict] = []
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
async def capture_post(url, *, content, headers):
captured.append(json.loads(content))
return mock_response
with (
patch("app.fanout.map_upload.get_private_key", return_value=fake_private),
patch("app.fanout.map_upload.get_public_key", return_value=fake_public),
patch("app.fanout.map_upload._get_radio_params", return_value={"freq": 0, "cr": 0, "sf": 0, "bw": 0}),
):
assert mod._client is not None
mod._client.post = capture_post # type: ignore[method-assign]
await mod._upload("ab" * 32, 1000, 2, "ff")
assert captured[0]["publicKey"] == fake_public.hex()
await mod.stop()
# ---------------------------------------------------------------------------
# _get_radio_params
# ---------------------------------------------------------------------------
class TestGetRadioParams:
def test_returns_zeros_when_radio_not_connected(self):
with patch("app.fanout.map_upload.radio_runtime") as mock_rt:
mock_rt.meshcore = None
params = _get_radio_params()
assert params == {"freq": 0, "cr": 0, "sf": 0, "bw": 0}
def test_returns_zeros_on_exception(self):
with patch("app.fanout.map_upload.radio_runtime", side_effect=Exception("boom")):
params = _get_radio_params()
assert params == {"freq": 0, "cr": 0, "sf": 0, "bw": 0}
def test_divides_freq_and_bw_by_1000(self):
mock_rt = MagicMock()
mock_rt.meshcore.self_info = {
"radio_freq": 915000,
"radio_bw": 125000,
"radio_sf": 10,
"radio_cr": 5,
}
with patch("app.fanout.map_upload.radio_runtime", mock_rt):
params = _get_radio_params()
assert params["freq"] == 915.0
assert params["bw"] == 125.0
assert params["sf"] == 10
assert params["cr"] == 5