From 87ea2b46750e41c4e30947d9e7881835db876bd4 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Thu, 12 Mar 2026 16:45:36 -0700 Subject: [PATCH] LRU-based parallel channel storage --- app/AGENTS.md | 2 + app/radio.py | 92 +++++++++++++++++++++++++++++++ app/radio_sync.py | 1 + app/routers/messages.py | 7 ++- app/services/message_send.py | 60 +++++++++++++++----- tests/test_api.py | 8 +++ tests/test_radio.py | 2 + tests/test_radio_sync.py | 23 ++++++++ tests/test_send_messages.py | 104 +++++++++++++++++++++++++++++++++++ 9 files changed, 282 insertions(+), 17 deletions(-) diff --git a/app/AGENTS.md b/app/AGENTS.md index 69a3494..80165ee 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -99,6 +99,8 @@ app/ - Packet `path_len` values are hop counts, not byte counts. - Hop width comes from the packet or radio `path_hash_mode`: `0` = 1-byte, `1` = 2-byte, `2` = 3-byte. - Channel slot count comes from firmware-reported `DEVICE_INFO.max_channels`; do not hardcode `40` when scanning/offloading channel slots. +- Channel sends use a session-local LRU slot cache after startup channel offload clears the radio. Repeated sends to the same room reuse the loaded slot; new rooms fill free slots up to the discovered channel capacity, then evict the least recently used cached room. +- TCP radios do not reuse cached slot contents. For TCP, channel sends still force `set_channel(...)` before every send because this backend does not have exclusive device access. - Contacts persist `out_path_hash_mode` in the database so contact sync and outbound DM routing reuse the exact stored mode instead of inferring from path bytes. - Contacts may also persist `route_override_path`, `route_override_len`, and `route_override_hash_mode`. `Contact.to_radio_dict()` gives these override fields precedence over learned `last_path*`, while advert processing still updates the learned route for telemetry/fallback. - `contact_advert_paths` identity is `(public_key, path_hex, path_len)` because the same hex bytes can represent different routes at different hop widths. diff --git a/app/radio.py b/app/radio.py index d71c8a8..768b762 100644 --- a/app/radio.py +++ b/app/radio.py @@ -2,6 +2,7 @@ import asyncio import glob import logging import platform +from collections import OrderedDict from contextlib import asynccontextmanager, nullcontext from pathlib import Path @@ -132,6 +133,8 @@ class RadioManager: self.max_channels: int = 40 self.path_hash_mode: int = 0 self.path_hash_mode_supported: bool = False + self._channel_slot_by_key: OrderedDict[str, int] = OrderedDict() + self._channel_key_by_slot: dict[int, str] = {} async def _acquire_operation_lock( self, @@ -224,6 +227,94 @@ class RadioManager: await run_post_connect_setup(self) + def reset_channel_send_cache(self) -> None: + """Forget any session-local channel-slot reuse state.""" + self._channel_slot_by_key.clear() + self._channel_key_by_slot.clear() + + def channel_slot_reuse_enabled(self) -> bool: + """Return whether this transport can safely reuse cached channel slots.""" + if self._connection_info: + return not self._connection_info.startswith("TCP:") + return settings.connection_type != "tcp" + + def get_channel_send_cache_capacity(self) -> int: + """Return the app-managed channel cache capacity for the current session.""" + try: + return max(1, int(self.max_channels)) + except (TypeError, ValueError): + return 1 + + def get_cached_channel_slot(self, channel_key: str) -> int | None: + """Return the cached radio slot for a channel key, if present.""" + return self._channel_slot_by_key.get(channel_key.upper()) + + def plan_channel_send_slot( + self, + channel_key: str, + *, + preferred_slot: int = 0, + ) -> tuple[int, bool, str | None]: + """Choose a radio slot for a channel send. + + Returns `(slot, needs_configure, evicted_channel_key)`. + """ + if not self.channel_slot_reuse_enabled(): + return preferred_slot, True, None + + normalized_key = channel_key.upper() + cached_slot = self._channel_slot_by_key.get(normalized_key) + if cached_slot is not None: + return cached_slot, False, None + + capacity = self.get_channel_send_cache_capacity() + if len(self._channel_slot_by_key) < capacity: + slot = self._find_first_free_channel_slot(capacity, preferred_slot) + return slot, True, None + + evicted_key, slot = next(iter(self._channel_slot_by_key.items())) + return slot, True, evicted_key + + def note_channel_slot_loaded(self, channel_key: str, slot: int) -> None: + """Record that a channel is now resident in the given radio slot.""" + if not self.channel_slot_reuse_enabled(): + return + + normalized_key = channel_key.upper() + previous_slot = self._channel_slot_by_key.pop(normalized_key, None) + if previous_slot is not None and previous_slot != slot: + self._channel_key_by_slot.pop(previous_slot, None) + + displaced_key = self._channel_key_by_slot.get(slot) + if displaced_key is not None and displaced_key != normalized_key: + self._channel_slot_by_key.pop(displaced_key, None) + + self._channel_key_by_slot[slot] = normalized_key + self._channel_slot_by_key[normalized_key] = slot + + def note_channel_slot_used(self, channel_key: str) -> None: + """Refresh LRU order for a previously loaded channel slot.""" + if not self.channel_slot_reuse_enabled(): + return + + normalized_key = channel_key.upper() + slot = self._channel_slot_by_key.get(normalized_key) + if slot is None: + return + self._channel_slot_by_key.move_to_end(normalized_key) + self._channel_key_by_slot[slot] = normalized_key + + def _find_first_free_channel_slot(self, capacity: int, preferred_slot: int) -> int: + """Pick the first unclaimed app-managed slot, preferring the requested slot.""" + if preferred_slot < capacity and preferred_slot not in self._channel_key_by_slot: + return preferred_slot + + for slot in range(capacity): + if slot not in self._channel_key_by_slot: + return slot + + return preferred_slot + @property def meshcore(self) -> MeshCore | None: return self._meshcore @@ -370,6 +461,7 @@ class RadioManager: self.max_channels = 40 self.path_hash_mode = 0 self.path_hash_mode_supported = False + self.reset_channel_send_cache() logger.debug("Radio disconnected") async def reconnect(self, *, broadcast_on_success: bool = True) -> bool: diff --git a/app/radio_sync.py b/app/radio_sync.py index 2131643..20e6299 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -310,6 +310,7 @@ async def sync_and_offload_channels(mc: MeshCore, max_channels: int | None = Non cleared = 0 try: + radio_manager.reset_channel_send_cache() channel_limit = get_radio_channel_limit(max_channels) # Check all available channel slots for this firmware variant diff --git a/app/routers/messages.py b/app/routers/messages.py index 4e14d16..3b8d12f 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -126,7 +126,9 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message: ) -# Temporary radio slot used for sending channel messages +# Preferred first radio slot used for sending channel messages. +# The send service may reuse/load other app-managed slots depending on transport +# and session cache state. TEMP_RADIO_SLOT = 0 @@ -155,10 +157,9 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message: expected_hash = calculate_channel_hash(key_bytes) logger.info( - "Sending to channel %s (%s) via radio slot %d, key hash: %s", + "Sending to channel %s (%s) via managed radio slot, key hash: %s", request.channel_key, db_channel.name, - TEMP_RADIO_SLOT, expected_hash, ) return await send_channel_message_to_channel( diff --git a/app/services/message_send.py b/app/services/message_send.py index be15291..2f5ae9f 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -26,10 +26,12 @@ async def send_channel_message_with_effective_scope( *, mc, channel, + channel_key: str, key_bytes: bytes, text: str, timestamp_bytes: bytes, action_label: str, + radio_manager, temp_radio_slot: int, error_broadcast_fn: BroadcastFn, app_settings_repository=AppSettingsRepository, @@ -64,28 +66,54 @@ async def send_channel_message_with_effective_scope( ) try: - set_result = await mc.commands.set_channel( - channel_idx=temp_radio_slot, - channel_name=channel.name, - channel_secret=key_bytes, + channel_slot, needs_configure, evicted_channel_key = radio_manager.plan_channel_send_slot( + channel_key, + preferred_slot=temp_radio_slot, ) - if set_result.type == EventType.ERROR: - logger.warning( - "Failed to set channel on radio slot %d before %s: %s", - temp_radio_slot, + if needs_configure: + logger.debug( + "Loading channel %s into radio slot %d before %s%s", + channel.name, + channel_slot, action_label, - set_result.payload, + ( + f" (evicting cached {evicted_channel_key[:8]})" + if evicted_channel_key is not None + else "" + ), ) - raise HTTPException( - status_code=500, - detail=f"Failed to configure channel on radio before {action_label}", + set_result = await mc.commands.set_channel( + channel_idx=channel_slot, + channel_name=channel.name, + channel_secret=key_bytes, + ) + if set_result.type == EventType.ERROR: + logger.warning( + "Failed to set channel on radio slot %d before %s: %s", + channel_slot, + action_label, + set_result.payload, + ) + raise HTTPException( + status_code=500, + detail=f"Failed to configure channel on radio before {action_label}", + ) + radio_manager.note_channel_slot_loaded(channel_key, channel_slot) + else: + logger.debug( + "Reusing cached radio slot %d for channel %s before %s", + channel_slot, + channel.name, + action_label, ) - return await mc.commands.send_chan_msg( - chan=temp_radio_slot, + send_result = await mc.commands.send_chan_msg( + chan=channel_slot, msg=text, timestamp=timestamp_bytes, ) + radio_manager.note_channel_slot_used(channel_key) + return send_result finally: if override_scope and override_scope != baseline_scope: try: @@ -223,10 +251,12 @@ async def send_channel_message_to_channel( result = await send_channel_message_with_effective_scope( mc=mc, channel=channel, + channel_key=channel_key_upper, key_bytes=key_bytes, text=text, timestamp_bytes=timestamp_bytes, action_label="sending message", + radio_manager=radio_manager, temp_radio_slot=temp_radio_slot, error_broadcast_fn=error_broadcast_fn, ) @@ -313,10 +343,12 @@ async def resend_channel_message_record( result = await send_channel_message_with_effective_scope( mc=mc, channel=channel, + channel_key=message.conversation_key, key_bytes=key_bytes, text=text_to_send, timestamp_bytes=timestamp_bytes, action_label="resending message", + radio_manager=radio_manager, temp_radio_slot=temp_radio_slot, error_broadcast_fn=error_broadcast_fn, ) diff --git a/tests/test_api.py b/tests/test_api.py index 2f2cbe0..5b395cf 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -25,9 +25,17 @@ def _reset_radio_state(): """Save/restore radio_manager state so tests don't leak.""" prev = radio_manager._meshcore prev_lock = radio_manager._operation_lock + prev_max_channels = radio_manager.max_channels + prev_connection_info = radio_manager._connection_info + prev_slot_by_key = radio_manager._channel_slot_by_key.copy() + prev_key_by_slot = radio_manager._channel_key_by_slot.copy() yield radio_manager._meshcore = prev radio_manager._operation_lock = prev_lock + radio_manager.max_channels = prev_max_channels + radio_manager._connection_info = prev_connection_info + radio_manager._channel_slot_by_key = prev_slot_by_key + radio_manager._channel_key_by_slot = prev_key_by_slot def _patch_require_connected(mc=None, *, detail="Radio not connected"): diff --git a/tests/test_radio.py b/tests/test_radio.py index 88994cb..c0b65ca 100644 --- a/tests/test_radio.py +++ b/tests/test_radio.py @@ -478,6 +478,7 @@ class TestManualDisconnectCleanup: rm.max_channels = 8 rm.path_hash_mode = 2 rm.path_hash_mode_supported = True + rm.note_channel_slot_loaded("AA" * 16, 0) await rm.disconnect() @@ -490,6 +491,7 @@ class TestManualDisconnectCleanup: assert rm.max_channels == 40 assert rm.path_hash_mode == 0 assert rm.path_hash_mode_supported is False + assert rm.get_cached_channel_slot("AA" * 16) is None @pytest.mark.asyncio async def test_pause_connection_marks_connection_undesired(self): diff --git a/tests/test_radio_sync.py b/tests/test_radio_sync.py index ab84550..075a335 100644 --- a/tests/test_radio_sync.py +++ b/tests/test_radio_sync.py @@ -37,6 +37,9 @@ def reset_sync_state(): prev_mc = radio_manager._meshcore prev_lock = radio_manager._operation_lock + prev_max_channels = radio_manager.max_channels + prev_slot_by_key = radio_manager._channel_slot_by_key.copy() + prev_key_by_slot = radio_manager._channel_key_by_slot.copy() radio_sync._polling_pause_count = 0 radio_sync._last_contact_sync = 0.0 @@ -45,6 +48,9 @@ def reset_sync_state(): radio_sync._last_contact_sync = 0.0 radio_manager._meshcore = prev_mc radio_manager._operation_lock = prev_lock + radio_manager.max_channels = prev_max_channels + radio_manager._channel_slot_by_key = prev_slot_by_key + radio_manager._channel_key_by_slot = prev_key_by_slot KEY_A = "aa" * 32 @@ -1053,6 +1059,23 @@ class TestSyncAndOffloadChannels: assert result["synced"] == 0 assert result["cleared"] == 0 + @pytest.mark.asyncio + async def test_channel_offload_resets_send_slot_cache(self): + """Clearing radio channels should invalidate session-local send-slot reuse state.""" + from app.radio_sync import sync_and_offload_channels + + empty_result = MagicMock() + empty_result.type = EventType.ERROR + + mock_mc = MagicMock() + mock_mc.commands.get_channel = AsyncMock(return_value=empty_result) + radio_manager.max_channels = 2 + radio_manager.note_channel_slot_loaded("AA" * 16, 0) + + await sync_and_offload_channels(mock_mc) + + assert radio_manager.get_cached_channel_slot("AA" * 16) is None + class TestEnsureDefaultChannels: """Test ensure_default_channels: create/fix the Public channel.""" diff --git a/tests/test_send_messages.py b/tests/test_send_messages.py index 0197105..9ce51af 100644 --- a/tests/test_send_messages.py +++ b/tests/test_send_messages.py @@ -31,9 +31,17 @@ def _reset_radio_state(): """Save/restore radio_manager state so tests don't leak.""" prev = radio_manager._meshcore prev_lock = radio_manager._operation_lock + prev_max_channels = radio_manager.max_channels + prev_connection_info = radio_manager._connection_info + prev_slot_by_key = radio_manager._channel_slot_by_key.copy() + prev_key_by_slot = radio_manager._channel_key_by_slot.copy() yield radio_manager._meshcore = prev radio_manager._operation_lock = prev_lock + radio_manager.max_channels = prev_max_channels + radio_manager._connection_info = prev_connection_info + radio_manager._channel_slot_by_key = prev_slot_by_key + radio_manager._channel_key_by_slot = prev_key_by_slot def _make_radio_result(payload=None): @@ -346,6 +354,102 @@ class TestOutgoingChannelBroadcast: mc.commands.set_channel.assert_not_awaited() mc.commands.send_chan_msg.assert_not_awaited() + @pytest.mark.asyncio + async def test_send_channel_msg_reuses_cached_slot_for_same_channel(self, test_db): + mc = _make_mc(name="MyNode") + chan_key = "b1" * 16 + await ChannelRepository.upsert(key=chan_key, name="#cached") + radio_manager.max_channels = 4 + radio_manager._connection_info = "Serial: /dev/ttyUSB0" + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.decoder.calculate_channel_hash", return_value="abcd"), + patch("app.routers.messages.broadcast_event"), + ): + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="first send") + ) + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="second send") + ) + + assert mc.commands.set_channel.await_count == 1 + assert mc.commands.send_chan_msg.await_count == 2 + assert [call.kwargs["chan"] for call in mc.commands.send_chan_msg.await_args_list] == [0, 0] + + @pytest.mark.asyncio + async def test_send_channel_msg_uses_lru_slot_eviction(self, test_db): + mc = _make_mc(name="MyNode") + chan_key_a = "c1" * 16 + chan_key_b = "c2" * 16 + chan_key_c = "c3" * 16 + await ChannelRepository.upsert(key=chan_key_a, name="#alpha") + await ChannelRepository.upsert(key=chan_key_b, name="#bravo") + await ChannelRepository.upsert(key=chan_key_c, name="#charlie") + radio_manager.max_channels = 2 + radio_manager._connection_info = "Serial: /dev/ttyUSB0" + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.decoder.calculate_channel_hash", return_value="abcd"), + patch("app.routers.messages.broadcast_event"), + ): + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key_a, text="to alpha") + ) + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key_b, text="to bravo") + ) + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key_a, text="alpha again") + ) + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key_c, text="to charlie") + ) + + assert [call.kwargs["channel_idx"] for call in mc.commands.set_channel.await_args_list] == [ + 0, + 1, + 1, + ] + assert [call.kwargs["chan"] for call in mc.commands.send_chan_msg.await_args_list] == [ + 0, + 1, + 0, + 1, + ] + assert radio_manager.get_cached_channel_slot(chan_key_a) == 0 + assert radio_manager.get_cached_channel_slot(chan_key_b) is None + assert radio_manager.get_cached_channel_slot(chan_key_c) == 1 + + @pytest.mark.asyncio + async def test_send_channel_msg_tcp_always_reconfigures_slot(self, test_db): + mc = _make_mc(name="MyNode") + chan_key = "d1" * 16 + await ChannelRepository.upsert(key=chan_key, name="#tcp") + radio_manager.max_channels = 4 + radio_manager._connection_info = "TCP: 127.0.0.1:4000" + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.decoder.calculate_channel_hash", return_value="abcd"), + patch("app.routers.messages.broadcast_event"), + ): + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="first send") + ) + await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="second send") + ) + + assert mc.commands.set_channel.await_count == 2 + assert mc.commands.send_chan_msg.await_count == 2 + assert radio_manager.get_cached_channel_slot(chan_key) is None + class TestResendChannelMessage: """Test the user-triggered resend endpoint."""