Event handler dedupe, CLAUDE.md patchups, more (jeez

) acked field int vs bool fixes, and throw exceptions not assertions (+Pydantic v2)
This commit is contained in:
Jack Kingsman
2026-01-17 19:01:34 -08:00
parent 94bcf42cff
commit 546d2f2f7d
7 changed files with 271 additions and 68 deletions

View File

@@ -201,8 +201,8 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`).
| PATCH | `/api/radio/config` | Update name, location, radio params |
| POST | `/api/radio/advertise` | Send advertisement |
| POST | `/api/radio/reconnect` | Manual radio reconnection |
| POST | `/api/radio/enable-server-decryption` | Export private key, enable decryption |
| GET | `/api/radio/decryption-status` | Check if decryption enabled |
| POST | `/api/radio/reboot` | Reboot radio or reconnect if disconnected |
| PUT | `/api/radio/private-key` | Import private key to radio |
| GET | `/api/contacts` | List contacts |
| POST | `/api/contacts/sync` | Pull from radio |
| POST | `/api/contacts/{key}/telemetry` | Request telemetry from repeater |
@@ -265,16 +265,11 @@ Read state (`last_read_at`) is tracked **server-side** for consistency across de
### Server-Side Decryption
The server can decrypt historical packets if given the necessary keys:
The server can decrypt historical channel packets using stored channel keys.
**Channel messages**: Decrypted automatically using stored channel keys.
**Channel messages**: Decrypted automatically when a matching channel key is available.
**Direct messages**: Requires the node's private key, which must be:
1. Exported from radio via `POST /radio/enable-server-decryption`
2. Stored **only in memory** (never persisted to disk)
3. Re-exported after every server restart
This allows decrypting messages from contacts whose public keys were learned after the message was received.
**Direct messages**: Currently decrypted only by the MeshCore library on the radio itself. Server-side direct message decryption is not yet implemented.
## MeshCore Library

View File

@@ -291,45 +291,12 @@ if result:
### Direct Message Decryption
Direct messages use ECDH key exchange (Ed25519 → X25519) with the sender's public key
and recipient's private key:
Direct messages use ECDH key exchange (Ed25519 → X25519). Server-side decryption
of direct messages is **not yet implemented**. Currently, direct messages are
decrypted by the MeshCore library on the radio itself.
```python
from app.decoder import try_decrypt_packet_with_contact_key
result = try_decrypt_packet_with_contact_key(
raw_bytes, sender_pub_key, recipient_prv_key
)
if result:
print(f"Message: {result.message}")
```
**Requirements:**
- Sender's Ed25519 public key (32 bytes)
- Recipient's Ed25519 private key (64 bytes) - from ephemeral KeyStore
### Ephemeral Key Store (`keystore.py`)
Private keys are stored **only in memory** for security:
```python
from app.keystore import KeyStore
# Set private key (exported from radio)
KeyStore.set_private_key(private_key_bytes)
# Check if available
if KeyStore.has_private_key():
key = KeyStore.get_private_key()
# Clear from memory
KeyStore.clear_private_key()
```
**Security guarantees:**
- Never written to disk
- Never logged
- Lost on server restart (must re-export from radio)
The decoder module contains a `try_decrypt_packet_with_contact_key()` function
that could support this feature in the future.
## Advertisement Parsing (`decoder.py`)
@@ -432,13 +399,10 @@ All endpoints are prefixed with `/api`.
### Radio
- `GET /api/radio/config` - Read config (public key, name, radio params)
- `PATCH /api/radio/config` - Update name, lat/lon, tx_power, radio params
- `PUT /api/radio/private-key` - Import private key (write-only)
- `PUT /api/radio/private-key` - Import private key to radio (write-only)
- `POST /api/radio/advertise?flood=true` - Send advertisement
- `POST /api/radio/reboot` - Reboot radio
- `POST /api/radio/reboot` - Reboot radio or reconnect if disconnected
- `POST /api/radio/reconnect` - Manual reconnection attempt
- `POST /api/radio/enable-server-decryption` - Export private key from radio, enable server-side decryption
- `GET /api/radio/decryption-status` - Check if server-side decryption is enabled
- `POST /api/radio/disable-server-decryption` - Clear private key from memory
### Contacts
- `GET /api/contacts` - List from database

View File

@@ -1,19 +1,19 @@
import logging
from typing import Literal
from pydantic import ConfigDict
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
model_config = ConfigDict(env_prefix="MESHCORE_")
serial_port: str = "" # Empty string triggers auto-detection
serial_baudrate: int = 115200
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = "INFO"
database_path: str = "data/meshcore.db"
max_radio_contacts: int = 200 # Max non-repeater contacts to keep on radio for DM ACKs
class Config:
env_prefix = "MESHCORE_"
settings = Settings()

View File

@@ -10,10 +10,14 @@ from app.repository import ContactRepository, MessageRepository
from app.websocket import broadcast_event
if TYPE_CHECKING:
from meshcore.events import Event
from meshcore.events import Event, Subscription
logger = logging.getLogger(__name__)
# Track active subscriptions so we can unsubscribe before re-registering
# This prevents handler duplication after reconnects
_active_subscriptions: list["Subscription"] = []
# Track pending ACKs: expected_ack_code -> (message_id, timestamp, timeout_ms)
_pending_acks: dict[str, tuple[int, float, int]] = {}
@@ -100,7 +104,7 @@ async def on_contact_message(event: "Event") -> None:
"txt_type": payload.get("txt_type", 0),
"signature": payload.get("signature"),
"outgoing": False,
"acked": False,
"acked": 0,
},
)
@@ -200,10 +204,36 @@ def register_event_handlers(meshcore) -> None:
Note: CHANNEL_MSG_RECV and ADVERTISEMENT events are NOT subscribed.
These are handled by the packet processor via RX_LOG_DATA to avoid
duplicate processing and ensure consistent handling.
This function is safe to call multiple times (e.g., after reconnect).
Existing handlers are unsubscribed before new ones are registered.
"""
meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message)
meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data)
meshcore.subscribe(EventType.PATH_UPDATE, on_path_update)
meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact)
meshcore.subscribe(EventType.ACK, on_ack)
global _active_subscriptions
# Unsubscribe existing handlers to prevent duplication after reconnects.
# Try/except handles the case where the old dispatcher is in a bad state
# (e.g., after reconnect with a new MeshCore instance).
for sub in _active_subscriptions:
try:
sub.unsubscribe()
except Exception:
pass # Old dispatcher may be gone, that's fine
_active_subscriptions.clear()
# Register handlers and track subscriptions
_active_subscriptions.append(
meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message)
)
_active_subscriptions.append(
meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data)
)
_active_subscriptions.append(
meshcore.subscribe(EventType.PATH_UPDATE, on_path_update)
)
_active_subscriptions.append(
meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact)
)
_active_subscriptions.append(
meshcore.subscribe(EventType.ACK, on_ack)
)
logger.info("Event handlers registered")

View File

@@ -95,7 +95,11 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message:
received_at=now,
outgoing=True,
)
assert message_id is not None # Outgoing messages are always new (unique timestamp)
if message_id is None:
raise HTTPException(
status_code=500,
detail="Failed to store outgoing message - unexpected duplicate",
)
# Update last_contacted for the contact
await ContactRepository.update_last_contacted(db_contact.public_key, now)
@@ -191,7 +195,11 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message:
received_at=now,
outgoing=True,
)
assert message_id is not None # Outgoing messages are always new (unique timestamp)
if message_id is None:
raise HTTPException(
status_code=500,
detail="Failed to store outgoing message - unexpected duplicate",
)
# Track for repeat detection (flood messages get confirmed by hearing repeats)
track_pending_repeat(channel_key_upper, request.text, now, message_id)

View File

@@ -120,6 +120,88 @@ class TestMessagesEndpoint:
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_send_direct_message_duplicate_returns_500(self):
"""If MessageRepository.create returns None (duplicate), returns 500."""
from app.models import SendDirectMessageRequest
from app.routers.messages import send_direct_message
mock_mc = MagicMock()
mock_mc.get_contact_by_key_prefix.return_value = {"public_key": "a" * 64}
mock_send_result = MagicMock()
mock_send_result.type = MagicMock()
mock_send_result.type.name = "OK"
mock_send_result.payload = {"expected_ack": b"\x00\x01"}
mock_mc.commands.send_msg = AsyncMock(return_value=mock_send_result)
mock_contact = MagicMock()
mock_contact.public_key = "a" * 64
mock_contact.to_radio_dict.return_value = {"public_key": "a" * 64}
with (
patch("app.dependencies.radio_manager") as mock_rm,
patch("app.repository.ContactRepository") as mock_contact_repo,
patch("app.routers.messages.MessageRepository") as mock_msg_repo,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
mock_contact_repo.update_last_contacted = AsyncMock()
# Simulate duplicate - create returns None
mock_msg_repo.create = AsyncMock(return_value=None)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await send_direct_message(
SendDirectMessageRequest(destination="a" * 64, text="Hello")
)
assert exc_info.value.status_code == 500
assert "unexpected duplicate" in exc_info.value.detail.lower()
@pytest.mark.asyncio
async def test_send_channel_message_duplicate_returns_500(self):
"""If MessageRepository.create returns None (duplicate), returns 500."""
from app.models import SendChannelMessageRequest
from app.routers.messages import send_channel_message
mock_mc = MagicMock()
mock_send_result = MagicMock()
mock_send_result.type = MagicMock()
mock_send_result.type.name = "OK"
mock_send_result.payload = {}
mock_mc.commands.send_chan_msg = AsyncMock(return_value=mock_send_result)
mock_mc.commands.set_channel = AsyncMock(return_value=mock_send_result)
mock_channel = MagicMock()
mock_channel.name = "test"
mock_channel.key = "0123456789ABCDEF0123456789ABCDEF"
with (
patch("app.dependencies.radio_manager") as mock_rm,
patch("app.repository.ChannelRepository") as mock_chan_repo,
patch("app.routers.messages.MessageRepository") as mock_msg_repo,
):
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
mock_chan_repo.get_by_key = AsyncMock(return_value=mock_channel)
# Simulate duplicate - create returns None
mock_msg_repo.create = AsyncMock(return_value=None)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await send_channel_message(
SendChannelMessageRequest(
channel_key="0123456789ABCDEF0123456789ABCDEF", text="Hello"
)
)
assert exc_info.value.status_code == 500
assert "unexpected duplicate" in exc_info.value.detail.lower()
class TestChannelsEndpoint:
"""Test channel-related endpoints."""

View File

@@ -5,13 +5,15 @@ that determine message delivery confirmation.
"""
import time
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.event_handlers import (
_active_subscriptions,
_cleanup_expired_acks,
_pending_acks,
register_event_handlers,
track_pending_ack,
)
from app.packet_processor import (
@@ -23,15 +25,17 @@ from app.packet_processor import (
@pytest.fixture(autouse=True)
def clear_pending_state():
"""Clear pending ACKs and repeats before each test."""
def clear_test_state():
"""Clear pending ACKs, repeats, and subscriptions before each test."""
_pending_acks.clear()
_pending_repeats.clear()
_pending_repeat_expiry.clear()
_active_subscriptions.clear()
yield
_pending_acks.clear()
_pending_repeats.clear()
_pending_repeat_expiry.clear()
_active_subscriptions.clear()
class TestAckTracking:
@@ -278,6 +282,40 @@ class TestContactMessageCLIFiltering:
# SHOULD broadcast via WebSocket
mock_broadcast.assert_called_once()
@pytest.mark.asyncio
async def test_broadcast_payload_has_correct_acked_type(self):
"""Broadcast payload should have acked as integer 0, not boolean False."""
from app.event_handlers import on_contact_message
with (
patch("app.event_handlers.MessageRepository") as mock_repo,
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
patch("app.event_handlers.broadcast_event") as mock_broadcast,
):
mock_repo.create = AsyncMock(return_value=42)
mock_contact_repo.get_by_key_prefix = AsyncMock(return_value=None)
class MockEvent:
payload = {
"pubkey_prefix": "abc123def456",
"text": "Test message",
"txt_type": 0,
"sender_timestamp": 1700000000,
}
await on_contact_message(MockEvent())
# Verify broadcast was called
mock_broadcast.assert_called_once()
call_args = mock_broadcast.call_args
# First arg is event type, second is payload dict
event_type, payload = call_args[0]
assert event_type == "message"
assert payload["acked"] == 0
assert payload["acked"] is not False # Ensure it's int, not bool
assert isinstance(payload["acked"], int)
@pytest.mark.asyncio
async def test_missing_txt_type_defaults_to_normal(self):
"""Messages without txt_type field are treated as normal (not filtered)."""
@@ -303,3 +341,89 @@ class TestContactMessageCLIFiltering:
# SHOULD still be processed (defaults to txt_type=0)
mock_repo.create.assert_called_once()
class TestEventHandlerRegistration:
"""Test event handler registration and cleanup."""
def test_register_handlers_tracks_subscriptions(self):
"""Registering handlers populates _active_subscriptions."""
mock_meshcore = MagicMock()
mock_subscription = MagicMock()
mock_meshcore.subscribe.return_value = mock_subscription
register_event_handlers(mock_meshcore)
# Should have 5 subscriptions (one per event type)
assert len(_active_subscriptions) == 5
assert mock_meshcore.subscribe.call_count == 5
def test_register_handlers_twice_does_not_duplicate(self):
"""Calling register_event_handlers twice unsubscribes old handlers first."""
mock_meshcore = MagicMock()
# First call: create mock subscriptions
first_subs = [MagicMock() for _ in range(5)]
mock_meshcore.subscribe.side_effect = first_subs
register_event_handlers(mock_meshcore)
assert len(_active_subscriptions) == 5
first_sub_objects = list(_active_subscriptions)
# Second call: create new mock subscriptions
second_subs = [MagicMock() for _ in range(5)]
mock_meshcore.subscribe.side_effect = second_subs
register_event_handlers(mock_meshcore)
# Old subscriptions should have been unsubscribed
for sub in first_sub_objects:
sub.unsubscribe.assert_called_once()
# Should still have exactly 5 subscriptions (not 10)
assert len(_active_subscriptions) == 5
# New subscriptions should be the second batch
for sub in second_subs:
assert sub in _active_subscriptions
def test_register_handlers_clears_before_adding(self):
"""The subscription list is cleared before adding new subscriptions."""
mock_meshcore = MagicMock()
mock_meshcore.subscribe.return_value = MagicMock()
# Pre-populate with stale subscriptions (simulating a bug scenario)
stale_sub = MagicMock()
_active_subscriptions.append(stale_sub)
_active_subscriptions.append(stale_sub)
register_event_handlers(mock_meshcore)
# Stale subscriptions should have been unsubscribed
assert stale_sub.unsubscribe.call_count == 2
# Should have exactly 5 fresh subscriptions
assert len(_active_subscriptions) == 5
def test_register_handlers_survives_unsubscribe_exception(self):
"""If unsubscribe() throws, registration still completes successfully."""
mock_meshcore = MagicMock()
mock_meshcore.subscribe.return_value = MagicMock()
# Create subscriptions where unsubscribe raises an exception
# (simulates old dispatcher being in a bad state after reconnect)
bad_sub = MagicMock()
bad_sub.unsubscribe.side_effect = RuntimeError("Dispatcher is dead")
_active_subscriptions.append(bad_sub)
good_sub = MagicMock()
_active_subscriptions.append(good_sub)
# Should not raise despite the exception
register_event_handlers(mock_meshcore)
# Both unsubscribe methods should have been called
bad_sub.unsubscribe.assert_called_once()
good_sub.unsubscribe.assert_called_once()
# Should have exactly 5 fresh subscriptions
assert len(_active_subscriptions) == 5