From 546d2f2f7d4cc7189ad70e7a84959a5b5461ddaa Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sat, 17 Jan 2026 19:01:34 -0800 Subject: [PATCH] Event handler dedupe, CLAUDE.md patchups, more (jeez ) acked field int vs bool fixes, and throw exceptions not assertions (+Pydantic v2) --- CLAUDE.md | 15 ++-- app/CLAUDE.md | 50 ++------------ app/config.py | 6 +- app/event_handlers.py | 44 ++++++++++-- app/routers/messages.py | 12 +++- tests/test_api.py | 82 ++++++++++++++++++++++ tests/test_event_handlers.py | 130 ++++++++++++++++++++++++++++++++++- 7 files changed, 271 insertions(+), 68 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index dcff214..bccf33c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -201,8 +201,8 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`). | PATCH | `/api/radio/config` | Update name, location, radio params | | POST | `/api/radio/advertise` | Send advertisement | | POST | `/api/radio/reconnect` | Manual radio reconnection | -| POST | `/api/radio/enable-server-decryption` | Export private key, enable decryption | -| GET | `/api/radio/decryption-status` | Check if decryption enabled | +| POST | `/api/radio/reboot` | Reboot radio or reconnect if disconnected | +| PUT | `/api/radio/private-key` | Import private key to radio | | GET | `/api/contacts` | List contacts | | POST | `/api/contacts/sync` | Pull from radio | | POST | `/api/contacts/{key}/telemetry` | Request telemetry from repeater | @@ -265,16 +265,11 @@ Read state (`last_read_at`) is tracked **server-side** for consistency across de ### Server-Side Decryption -The server can decrypt historical packets if given the necessary keys: +The server can decrypt historical channel packets using stored channel keys. -**Channel messages**: Decrypted automatically using stored channel keys. +**Channel messages**: Decrypted automatically when a matching channel key is available. -**Direct messages**: Requires the node's private key, which must be: -1. Exported from radio via `POST /radio/enable-server-decryption` -2. Stored **only in memory** (never persisted to disk) -3. Re-exported after every server restart - -This allows decrypting messages from contacts whose public keys were learned after the message was received. +**Direct messages**: Currently decrypted only by the MeshCore library on the radio itself. Server-side direct message decryption is not yet implemented. ## MeshCore Library diff --git a/app/CLAUDE.md b/app/CLAUDE.md index 342f923..17f0552 100644 --- a/app/CLAUDE.md +++ b/app/CLAUDE.md @@ -291,45 +291,12 @@ if result: ### Direct Message Decryption -Direct messages use ECDH key exchange (Ed25519 → X25519) with the sender's public key -and recipient's private key: +Direct messages use ECDH key exchange (Ed25519 → X25519). Server-side decryption +of direct messages is **not yet implemented**. Currently, direct messages are +decrypted by the MeshCore library on the radio itself. -```python -from app.decoder import try_decrypt_packet_with_contact_key - -result = try_decrypt_packet_with_contact_key( - raw_bytes, sender_pub_key, recipient_prv_key -) -if result: - print(f"Message: {result.message}") -``` - -**Requirements:** -- Sender's Ed25519 public key (32 bytes) -- Recipient's Ed25519 private key (64 bytes) - from ephemeral KeyStore - -### Ephemeral Key Store (`keystore.py`) - -Private keys are stored **only in memory** for security: - -```python -from app.keystore import KeyStore - -# Set private key (exported from radio) -KeyStore.set_private_key(private_key_bytes) - -# Check if available -if KeyStore.has_private_key(): - key = KeyStore.get_private_key() - -# Clear from memory -KeyStore.clear_private_key() -``` - -**Security guarantees:** -- Never written to disk -- Never logged -- Lost on server restart (must re-export from radio) +The decoder module contains a `try_decrypt_packet_with_contact_key()` function +that could support this feature in the future. ## Advertisement Parsing (`decoder.py`) @@ -432,13 +399,10 @@ All endpoints are prefixed with `/api`. ### Radio - `GET /api/radio/config` - Read config (public key, name, radio params) - `PATCH /api/radio/config` - Update name, lat/lon, tx_power, radio params -- `PUT /api/radio/private-key` - Import private key (write-only) +- `PUT /api/radio/private-key` - Import private key to radio (write-only) - `POST /api/radio/advertise?flood=true` - Send advertisement -- `POST /api/radio/reboot` - Reboot radio +- `POST /api/radio/reboot` - Reboot radio or reconnect if disconnected - `POST /api/radio/reconnect` - Manual reconnection attempt -- `POST /api/radio/enable-server-decryption` - Export private key from radio, enable server-side decryption -- `GET /api/radio/decryption-status` - Check if server-side decryption is enabled -- `POST /api/radio/disable-server-decryption` - Clear private key from memory ### Contacts - `GET /api/contacts` - List from database diff --git a/app/config.py b/app/config.py index 928ff61..5646c8e 100644 --- a/app/config.py +++ b/app/config.py @@ -1,19 +1,19 @@ import logging from typing import Literal +from pydantic import ConfigDict from pydantic_settings import BaseSettings class Settings(BaseSettings): + model_config = ConfigDict(env_prefix="MESHCORE_") + serial_port: str = "" # Empty string triggers auto-detection serial_baudrate: int = 115200 log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = "INFO" database_path: str = "data/meshcore.db" max_radio_contacts: int = 200 # Max non-repeater contacts to keep on radio for DM ACKs - class Config: - env_prefix = "MESHCORE_" - settings = Settings() diff --git a/app/event_handlers.py b/app/event_handlers.py index 2649d81..5d62831 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -10,10 +10,14 @@ from app.repository import ContactRepository, MessageRepository from app.websocket import broadcast_event if TYPE_CHECKING: - from meshcore.events import Event + from meshcore.events import Event, Subscription logger = logging.getLogger(__name__) +# Track active subscriptions so we can unsubscribe before re-registering +# This prevents handler duplication after reconnects +_active_subscriptions: list["Subscription"] = [] + # Track pending ACKs: expected_ack_code -> (message_id, timestamp, timeout_ms) _pending_acks: dict[str, tuple[int, float, int]] = {} @@ -100,7 +104,7 @@ async def on_contact_message(event: "Event") -> None: "txt_type": payload.get("txt_type", 0), "signature": payload.get("signature"), "outgoing": False, - "acked": False, + "acked": 0, }, ) @@ -200,10 +204,36 @@ def register_event_handlers(meshcore) -> None: Note: CHANNEL_MSG_RECV and ADVERTISEMENT events are NOT subscribed. These are handled by the packet processor via RX_LOG_DATA to avoid duplicate processing and ensure consistent handling. + + This function is safe to call multiple times (e.g., after reconnect). + Existing handlers are unsubscribed before new ones are registered. """ - meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message) - meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data) - meshcore.subscribe(EventType.PATH_UPDATE, on_path_update) - meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact) - meshcore.subscribe(EventType.ACK, on_ack) + global _active_subscriptions + + # Unsubscribe existing handlers to prevent duplication after reconnects. + # Try/except handles the case where the old dispatcher is in a bad state + # (e.g., after reconnect with a new MeshCore instance). + for sub in _active_subscriptions: + try: + sub.unsubscribe() + except Exception: + pass # Old dispatcher may be gone, that's fine + _active_subscriptions.clear() + + # Register handlers and track subscriptions + _active_subscriptions.append( + meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_contact_message) + ) + _active_subscriptions.append( + meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data) + ) + _active_subscriptions.append( + meshcore.subscribe(EventType.PATH_UPDATE, on_path_update) + ) + _active_subscriptions.append( + meshcore.subscribe(EventType.NEW_CONTACT, on_new_contact) + ) + _active_subscriptions.append( + meshcore.subscribe(EventType.ACK, on_ack) + ) logger.info("Event handlers registered") diff --git a/app/routers/messages.py b/app/routers/messages.py index e56b4d1..30f790b 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -95,7 +95,11 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message: received_at=now, outgoing=True, ) - assert message_id is not None # Outgoing messages are always new (unique timestamp) + if message_id is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) # Update last_contacted for the contact await ContactRepository.update_last_contacted(db_contact.public_key, now) @@ -191,7 +195,11 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message: received_at=now, outgoing=True, ) - assert message_id is not None # Outgoing messages are always new (unique timestamp) + if message_id is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) # Track for repeat detection (flood messages get confirmed by hearing repeats) track_pending_repeat(channel_key_upper, request.text, now, message_id) diff --git a/tests/test_api.py b/tests/test_api.py index 87cac67..46de768 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -120,6 +120,88 @@ class TestMessagesEndpoint: assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() + @pytest.mark.asyncio + async def test_send_direct_message_duplicate_returns_500(self): + """If MessageRepository.create returns None (duplicate), returns 500.""" + from app.models import SendDirectMessageRequest + from app.routers.messages import send_direct_message + + mock_mc = MagicMock() + mock_mc.get_contact_by_key_prefix.return_value = {"public_key": "a" * 64} + + mock_send_result = MagicMock() + mock_send_result.type = MagicMock() + mock_send_result.type.name = "OK" + mock_send_result.payload = {"expected_ack": b"\x00\x01"} + mock_mc.commands.send_msg = AsyncMock(return_value=mock_send_result) + + mock_contact = MagicMock() + mock_contact.public_key = "a" * 64 + mock_contact.to_radio_dict.return_value = {"public_key": "a" * 64} + + with ( + patch("app.dependencies.radio_manager") as mock_rm, + patch("app.repository.ContactRepository") as mock_contact_repo, + patch("app.routers.messages.MessageRepository") as mock_msg_repo, + ): + mock_rm.is_connected = True + mock_rm.meshcore = mock_mc + mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact) + mock_contact_repo.update_last_contacted = AsyncMock() + # Simulate duplicate - create returns None + mock_msg_repo.create = AsyncMock(return_value=None) + + from fastapi import HTTPException + + with pytest.raises(HTTPException) as exc_info: + await send_direct_message( + SendDirectMessageRequest(destination="a" * 64, text="Hello") + ) + + assert exc_info.value.status_code == 500 + assert "unexpected duplicate" in exc_info.value.detail.lower() + + @pytest.mark.asyncio + async def test_send_channel_message_duplicate_returns_500(self): + """If MessageRepository.create returns None (duplicate), returns 500.""" + from app.models import SendChannelMessageRequest + from app.routers.messages import send_channel_message + + mock_mc = MagicMock() + mock_send_result = MagicMock() + mock_send_result.type = MagicMock() + mock_send_result.type.name = "OK" + mock_send_result.payload = {} + mock_mc.commands.send_chan_msg = AsyncMock(return_value=mock_send_result) + mock_mc.commands.set_channel = AsyncMock(return_value=mock_send_result) + + mock_channel = MagicMock() + mock_channel.name = "test" + mock_channel.key = "0123456789ABCDEF0123456789ABCDEF" + + with ( + patch("app.dependencies.radio_manager") as mock_rm, + patch("app.repository.ChannelRepository") as mock_chan_repo, + patch("app.routers.messages.MessageRepository") as mock_msg_repo, + ): + mock_rm.is_connected = True + mock_rm.meshcore = mock_mc + mock_chan_repo.get_by_key = AsyncMock(return_value=mock_channel) + # Simulate duplicate - create returns None + mock_msg_repo.create = AsyncMock(return_value=None) + + from fastapi import HTTPException + + with pytest.raises(HTTPException) as exc_info: + await send_channel_message( + SendChannelMessageRequest( + channel_key="0123456789ABCDEF0123456789ABCDEF", text="Hello" + ) + ) + + assert exc_info.value.status_code == 500 + assert "unexpected duplicate" in exc_info.value.detail.lower() + class TestChannelsEndpoint: """Test channel-related endpoints.""" diff --git a/tests/test_event_handlers.py b/tests/test_event_handlers.py index da17369..3ab31e9 100644 --- a/tests/test_event_handlers.py +++ b/tests/test_event_handlers.py @@ -5,13 +5,15 @@ that determine message delivery confirmation. """ import time -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from app.event_handlers import ( + _active_subscriptions, _cleanup_expired_acks, _pending_acks, + register_event_handlers, track_pending_ack, ) from app.packet_processor import ( @@ -23,15 +25,17 @@ from app.packet_processor import ( @pytest.fixture(autouse=True) -def clear_pending_state(): - """Clear pending ACKs and repeats before each test.""" +def clear_test_state(): + """Clear pending ACKs, repeats, and subscriptions before each test.""" _pending_acks.clear() _pending_repeats.clear() _pending_repeat_expiry.clear() + _active_subscriptions.clear() yield _pending_acks.clear() _pending_repeats.clear() _pending_repeat_expiry.clear() + _active_subscriptions.clear() class TestAckTracking: @@ -278,6 +282,40 @@ class TestContactMessageCLIFiltering: # SHOULD broadcast via WebSocket mock_broadcast.assert_called_once() + @pytest.mark.asyncio + async def test_broadcast_payload_has_correct_acked_type(self): + """Broadcast payload should have acked as integer 0, not boolean False.""" + from app.event_handlers import on_contact_message + + with ( + patch("app.event_handlers.MessageRepository") as mock_repo, + patch("app.event_handlers.ContactRepository") as mock_contact_repo, + patch("app.event_handlers.broadcast_event") as mock_broadcast, + ): + mock_repo.create = AsyncMock(return_value=42) + mock_contact_repo.get_by_key_prefix = AsyncMock(return_value=None) + + class MockEvent: + payload = { + "pubkey_prefix": "abc123def456", + "text": "Test message", + "txt_type": 0, + "sender_timestamp": 1700000000, + } + + await on_contact_message(MockEvent()) + + # Verify broadcast was called + mock_broadcast.assert_called_once() + call_args = mock_broadcast.call_args + + # First arg is event type, second is payload dict + event_type, payload = call_args[0] + assert event_type == "message" + assert payload["acked"] == 0 + assert payload["acked"] is not False # Ensure it's int, not bool + assert isinstance(payload["acked"], int) + @pytest.mark.asyncio async def test_missing_txt_type_defaults_to_normal(self): """Messages without txt_type field are treated as normal (not filtered).""" @@ -303,3 +341,89 @@ class TestContactMessageCLIFiltering: # SHOULD still be processed (defaults to txt_type=0) mock_repo.create.assert_called_once() + + +class TestEventHandlerRegistration: + """Test event handler registration and cleanup.""" + + def test_register_handlers_tracks_subscriptions(self): + """Registering handlers populates _active_subscriptions.""" + mock_meshcore = MagicMock() + mock_subscription = MagicMock() + mock_meshcore.subscribe.return_value = mock_subscription + + register_event_handlers(mock_meshcore) + + # Should have 5 subscriptions (one per event type) + assert len(_active_subscriptions) == 5 + assert mock_meshcore.subscribe.call_count == 5 + + def test_register_handlers_twice_does_not_duplicate(self): + """Calling register_event_handlers twice unsubscribes old handlers first.""" + mock_meshcore = MagicMock() + + # First call: create mock subscriptions + first_subs = [MagicMock() for _ in range(5)] + mock_meshcore.subscribe.side_effect = first_subs + register_event_handlers(mock_meshcore) + + assert len(_active_subscriptions) == 5 + first_sub_objects = list(_active_subscriptions) + + # Second call: create new mock subscriptions + second_subs = [MagicMock() for _ in range(5)] + mock_meshcore.subscribe.side_effect = second_subs + register_event_handlers(mock_meshcore) + + # Old subscriptions should have been unsubscribed + for sub in first_sub_objects: + sub.unsubscribe.assert_called_once() + + # Should still have exactly 5 subscriptions (not 10) + assert len(_active_subscriptions) == 5 + + # New subscriptions should be the second batch + for sub in second_subs: + assert sub in _active_subscriptions + + def test_register_handlers_clears_before_adding(self): + """The subscription list is cleared before adding new subscriptions.""" + mock_meshcore = MagicMock() + mock_meshcore.subscribe.return_value = MagicMock() + + # Pre-populate with stale subscriptions (simulating a bug scenario) + stale_sub = MagicMock() + _active_subscriptions.append(stale_sub) + _active_subscriptions.append(stale_sub) + + register_event_handlers(mock_meshcore) + + # Stale subscriptions should have been unsubscribed + assert stale_sub.unsubscribe.call_count == 2 + + # Should have exactly 5 fresh subscriptions + assert len(_active_subscriptions) == 5 + + def test_register_handlers_survives_unsubscribe_exception(self): + """If unsubscribe() throws, registration still completes successfully.""" + mock_meshcore = MagicMock() + mock_meshcore.subscribe.return_value = MagicMock() + + # Create subscriptions where unsubscribe raises an exception + # (simulates old dispatcher being in a bad state after reconnect) + bad_sub = MagicMock() + bad_sub.unsubscribe.side_effect = RuntimeError("Dispatcher is dead") + _active_subscriptions.append(bad_sub) + + good_sub = MagicMock() + _active_subscriptions.append(good_sub) + + # Should not raise despite the exception + register_event_handlers(mock_meshcore) + + # Both unsubscribe methods should have been called + bad_sub.unsubscribe.assert_called_once() + good_sub.unsubscribe.assert_called_once() + + # Should have exactly 5 fresh subscriptions + assert len(_active_subscriptions) == 5