LRU-based parallel channel storage

This commit is contained in:
Jack Kingsman
2026-03-12 16:45:36 -07:00
parent 5c85a432c8
commit 87ea2b4675
9 changed files with 282 additions and 17 deletions

View File

@@ -99,6 +99,8 @@ app/
- Packet `path_len` values are hop counts, not byte counts.
- Hop width comes from the packet or radio `path_hash_mode`: `0` = 1-byte, `1` = 2-byte, `2` = 3-byte.
- Channel slot count comes from firmware-reported `DEVICE_INFO.max_channels`; do not hardcode `40` when scanning/offloading channel slots.
- Channel sends use a session-local LRU slot cache after startup channel offload clears the radio. Repeated sends to the same room reuse the loaded slot; new rooms fill free slots up to the discovered channel capacity, then evict the least recently used cached room.
- TCP radios do not reuse cached slot contents. For TCP, channel sends still force `set_channel(...)` before every send because this backend does not have exclusive device access.
- Contacts persist `out_path_hash_mode` in the database so contact sync and outbound DM routing reuse the exact stored mode instead of inferring from path bytes.
- Contacts may also persist `route_override_path`, `route_override_len`, and `route_override_hash_mode`. `Contact.to_radio_dict()` gives these override fields precedence over learned `last_path*`, while advert processing still updates the learned route for telemetry/fallback.
- `contact_advert_paths` identity is `(public_key, path_hex, path_len)` because the same hex bytes can represent different routes at different hop widths.

View File

@@ -2,6 +2,7 @@ import asyncio
import glob
import logging
import platform
from collections import OrderedDict
from contextlib import asynccontextmanager, nullcontext
from pathlib import Path
@@ -132,6 +133,8 @@ class RadioManager:
self.max_channels: int = 40
self.path_hash_mode: int = 0
self.path_hash_mode_supported: bool = False
self._channel_slot_by_key: OrderedDict[str, int] = OrderedDict()
self._channel_key_by_slot: dict[int, str] = {}
async def _acquire_operation_lock(
self,
@@ -224,6 +227,94 @@ class RadioManager:
await run_post_connect_setup(self)
def reset_channel_send_cache(self) -> None:
"""Forget any session-local channel-slot reuse state."""
self._channel_slot_by_key.clear()
self._channel_key_by_slot.clear()
def channel_slot_reuse_enabled(self) -> bool:
"""Return whether this transport can safely reuse cached channel slots."""
if self._connection_info:
return not self._connection_info.startswith("TCP:")
return settings.connection_type != "tcp"
def get_channel_send_cache_capacity(self) -> int:
"""Return the app-managed channel cache capacity for the current session."""
try:
return max(1, int(self.max_channels))
except (TypeError, ValueError):
return 1
def get_cached_channel_slot(self, channel_key: str) -> int | None:
"""Return the cached radio slot for a channel key, if present."""
return self._channel_slot_by_key.get(channel_key.upper())
def plan_channel_send_slot(
self,
channel_key: str,
*,
preferred_slot: int = 0,
) -> tuple[int, bool, str | None]:
"""Choose a radio slot for a channel send.
Returns `(slot, needs_configure, evicted_channel_key)`.
"""
if not self.channel_slot_reuse_enabled():
return preferred_slot, True, None
normalized_key = channel_key.upper()
cached_slot = self._channel_slot_by_key.get(normalized_key)
if cached_slot is not None:
return cached_slot, False, None
capacity = self.get_channel_send_cache_capacity()
if len(self._channel_slot_by_key) < capacity:
slot = self._find_first_free_channel_slot(capacity, preferred_slot)
return slot, True, None
evicted_key, slot = next(iter(self._channel_slot_by_key.items()))
return slot, True, evicted_key
def note_channel_slot_loaded(self, channel_key: str, slot: int) -> None:
"""Record that a channel is now resident in the given radio slot."""
if not self.channel_slot_reuse_enabled():
return
normalized_key = channel_key.upper()
previous_slot = self._channel_slot_by_key.pop(normalized_key, None)
if previous_slot is not None and previous_slot != slot:
self._channel_key_by_slot.pop(previous_slot, None)
displaced_key = self._channel_key_by_slot.get(slot)
if displaced_key is not None and displaced_key != normalized_key:
self._channel_slot_by_key.pop(displaced_key, None)
self._channel_key_by_slot[slot] = normalized_key
self._channel_slot_by_key[normalized_key] = slot
def note_channel_slot_used(self, channel_key: str) -> None:
"""Refresh LRU order for a previously loaded channel slot."""
if not self.channel_slot_reuse_enabled():
return
normalized_key = channel_key.upper()
slot = self._channel_slot_by_key.get(normalized_key)
if slot is None:
return
self._channel_slot_by_key.move_to_end(normalized_key)
self._channel_key_by_slot[slot] = normalized_key
def _find_first_free_channel_slot(self, capacity: int, preferred_slot: int) -> int:
"""Pick the first unclaimed app-managed slot, preferring the requested slot."""
if preferred_slot < capacity and preferred_slot not in self._channel_key_by_slot:
return preferred_slot
for slot in range(capacity):
if slot not in self._channel_key_by_slot:
return slot
return preferred_slot
@property
def meshcore(self) -> MeshCore | None:
return self._meshcore
@@ -370,6 +461,7 @@ class RadioManager:
self.max_channels = 40
self.path_hash_mode = 0
self.path_hash_mode_supported = False
self.reset_channel_send_cache()
logger.debug("Radio disconnected")
async def reconnect(self, *, broadcast_on_success: bool = True) -> bool:

View File

@@ -310,6 +310,7 @@ async def sync_and_offload_channels(mc: MeshCore, max_channels: int | None = Non
cleared = 0
try:
radio_manager.reset_channel_send_cache()
channel_limit = get_radio_channel_limit(max_channels)
# Check all available channel slots for this firmware variant

View File

@@ -126,7 +126,9 @@ async def send_direct_message(request: SendDirectMessageRequest) -> Message:
)
# Temporary radio slot used for sending channel messages
# Preferred first radio slot used for sending channel messages.
# The send service may reuse/load other app-managed slots depending on transport
# and session cache state.
TEMP_RADIO_SLOT = 0
@@ -155,10 +157,9 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message:
expected_hash = calculate_channel_hash(key_bytes)
logger.info(
"Sending to channel %s (%s) via radio slot %d, key hash: %s",
"Sending to channel %s (%s) via managed radio slot, key hash: %s",
request.channel_key,
db_channel.name,
TEMP_RADIO_SLOT,
expected_hash,
)
return await send_channel_message_to_channel(

View File

@@ -26,10 +26,12 @@ async def send_channel_message_with_effective_scope(
*,
mc,
channel,
channel_key: str,
key_bytes: bytes,
text: str,
timestamp_bytes: bytes,
action_label: str,
radio_manager,
temp_radio_slot: int,
error_broadcast_fn: BroadcastFn,
app_settings_repository=AppSettingsRepository,
@@ -64,28 +66,54 @@ async def send_channel_message_with_effective_scope(
)
try:
set_result = await mc.commands.set_channel(
channel_idx=temp_radio_slot,
channel_name=channel.name,
channel_secret=key_bytes,
channel_slot, needs_configure, evicted_channel_key = radio_manager.plan_channel_send_slot(
channel_key,
preferred_slot=temp_radio_slot,
)
if set_result.type == EventType.ERROR:
logger.warning(
"Failed to set channel on radio slot %d before %s: %s",
temp_radio_slot,
if needs_configure:
logger.debug(
"Loading channel %s into radio slot %d before %s%s",
channel.name,
channel_slot,
action_label,
set_result.payload,
(
f" (evicting cached {evicted_channel_key[:8]})"
if evicted_channel_key is not None
else ""
),
)
raise HTTPException(
status_code=500,
detail=f"Failed to configure channel on radio before {action_label}",
set_result = await mc.commands.set_channel(
channel_idx=channel_slot,
channel_name=channel.name,
channel_secret=key_bytes,
)
if set_result.type == EventType.ERROR:
logger.warning(
"Failed to set channel on radio slot %d before %s: %s",
channel_slot,
action_label,
set_result.payload,
)
raise HTTPException(
status_code=500,
detail=f"Failed to configure channel on radio before {action_label}",
)
radio_manager.note_channel_slot_loaded(channel_key, channel_slot)
else:
logger.debug(
"Reusing cached radio slot %d for channel %s before %s",
channel_slot,
channel.name,
action_label,
)
return await mc.commands.send_chan_msg(
chan=temp_radio_slot,
send_result = await mc.commands.send_chan_msg(
chan=channel_slot,
msg=text,
timestamp=timestamp_bytes,
)
radio_manager.note_channel_slot_used(channel_key)
return send_result
finally:
if override_scope and override_scope != baseline_scope:
try:
@@ -223,10 +251,12 @@ async def send_channel_message_to_channel(
result = await send_channel_message_with_effective_scope(
mc=mc,
channel=channel,
channel_key=channel_key_upper,
key_bytes=key_bytes,
text=text,
timestamp_bytes=timestamp_bytes,
action_label="sending message",
radio_manager=radio_manager,
temp_radio_slot=temp_radio_slot,
error_broadcast_fn=error_broadcast_fn,
)
@@ -313,10 +343,12 @@ async def resend_channel_message_record(
result = await send_channel_message_with_effective_scope(
mc=mc,
channel=channel,
channel_key=message.conversation_key,
key_bytes=key_bytes,
text=text_to_send,
timestamp_bytes=timestamp_bytes,
action_label="resending message",
radio_manager=radio_manager,
temp_radio_slot=temp_radio_slot,
error_broadcast_fn=error_broadcast_fn,
)

View File

@@ -25,9 +25,17 @@ def _reset_radio_state():
"""Save/restore radio_manager state so tests don't leak."""
prev = radio_manager._meshcore
prev_lock = radio_manager._operation_lock
prev_max_channels = radio_manager.max_channels
prev_connection_info = radio_manager._connection_info
prev_slot_by_key = radio_manager._channel_slot_by_key.copy()
prev_key_by_slot = radio_manager._channel_key_by_slot.copy()
yield
radio_manager._meshcore = prev
radio_manager._operation_lock = prev_lock
radio_manager.max_channels = prev_max_channels
radio_manager._connection_info = prev_connection_info
radio_manager._channel_slot_by_key = prev_slot_by_key
radio_manager._channel_key_by_slot = prev_key_by_slot
def _patch_require_connected(mc=None, *, detail="Radio not connected"):

View File

@@ -478,6 +478,7 @@ class TestManualDisconnectCleanup:
rm.max_channels = 8
rm.path_hash_mode = 2
rm.path_hash_mode_supported = True
rm.note_channel_slot_loaded("AA" * 16, 0)
await rm.disconnect()
@@ -490,6 +491,7 @@ class TestManualDisconnectCleanup:
assert rm.max_channels == 40
assert rm.path_hash_mode == 0
assert rm.path_hash_mode_supported is False
assert rm.get_cached_channel_slot("AA" * 16) is None
@pytest.mark.asyncio
async def test_pause_connection_marks_connection_undesired(self):

View File

@@ -37,6 +37,9 @@ def reset_sync_state():
prev_mc = radio_manager._meshcore
prev_lock = radio_manager._operation_lock
prev_max_channels = radio_manager.max_channels
prev_slot_by_key = radio_manager._channel_slot_by_key.copy()
prev_key_by_slot = radio_manager._channel_key_by_slot.copy()
radio_sync._polling_pause_count = 0
radio_sync._last_contact_sync = 0.0
@@ -45,6 +48,9 @@ def reset_sync_state():
radio_sync._last_contact_sync = 0.0
radio_manager._meshcore = prev_mc
radio_manager._operation_lock = prev_lock
radio_manager.max_channels = prev_max_channels
radio_manager._channel_slot_by_key = prev_slot_by_key
radio_manager._channel_key_by_slot = prev_key_by_slot
KEY_A = "aa" * 32
@@ -1053,6 +1059,23 @@ class TestSyncAndOffloadChannels:
assert result["synced"] == 0
assert result["cleared"] == 0
@pytest.mark.asyncio
async def test_channel_offload_resets_send_slot_cache(self):
"""Clearing radio channels should invalidate session-local send-slot reuse state."""
from app.radio_sync import sync_and_offload_channels
empty_result = MagicMock()
empty_result.type = EventType.ERROR
mock_mc = MagicMock()
mock_mc.commands.get_channel = AsyncMock(return_value=empty_result)
radio_manager.max_channels = 2
radio_manager.note_channel_slot_loaded("AA" * 16, 0)
await sync_and_offload_channels(mock_mc)
assert radio_manager.get_cached_channel_slot("AA" * 16) is None
class TestEnsureDefaultChannels:
"""Test ensure_default_channels: create/fix the Public channel."""

View File

@@ -31,9 +31,17 @@ def _reset_radio_state():
"""Save/restore radio_manager state so tests don't leak."""
prev = radio_manager._meshcore
prev_lock = radio_manager._operation_lock
prev_max_channels = radio_manager.max_channels
prev_connection_info = radio_manager._connection_info
prev_slot_by_key = radio_manager._channel_slot_by_key.copy()
prev_key_by_slot = radio_manager._channel_key_by_slot.copy()
yield
radio_manager._meshcore = prev
radio_manager._operation_lock = prev_lock
radio_manager.max_channels = prev_max_channels
radio_manager._connection_info = prev_connection_info
radio_manager._channel_slot_by_key = prev_slot_by_key
radio_manager._channel_key_by_slot = prev_key_by_slot
def _make_radio_result(payload=None):
@@ -346,6 +354,102 @@ class TestOutgoingChannelBroadcast:
mc.commands.set_channel.assert_not_awaited()
mc.commands.send_chan_msg.assert_not_awaited()
@pytest.mark.asyncio
async def test_send_channel_msg_reuses_cached_slot_for_same_channel(self, test_db):
mc = _make_mc(name="MyNode")
chan_key = "b1" * 16
await ChannelRepository.upsert(key=chan_key, name="#cached")
radio_manager.max_channels = 4
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
with (
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.decoder.calculate_channel_hash", return_value="abcd"),
patch("app.routers.messages.broadcast_event"),
):
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key, text="first send")
)
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key, text="second send")
)
assert mc.commands.set_channel.await_count == 1
assert mc.commands.send_chan_msg.await_count == 2
assert [call.kwargs["chan"] for call in mc.commands.send_chan_msg.await_args_list] == [0, 0]
@pytest.mark.asyncio
async def test_send_channel_msg_uses_lru_slot_eviction(self, test_db):
mc = _make_mc(name="MyNode")
chan_key_a = "c1" * 16
chan_key_b = "c2" * 16
chan_key_c = "c3" * 16
await ChannelRepository.upsert(key=chan_key_a, name="#alpha")
await ChannelRepository.upsert(key=chan_key_b, name="#bravo")
await ChannelRepository.upsert(key=chan_key_c, name="#charlie")
radio_manager.max_channels = 2
radio_manager._connection_info = "Serial: /dev/ttyUSB0"
with (
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.decoder.calculate_channel_hash", return_value="abcd"),
patch("app.routers.messages.broadcast_event"),
):
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key_a, text="to alpha")
)
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key_b, text="to bravo")
)
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key_a, text="alpha again")
)
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key_c, text="to charlie")
)
assert [call.kwargs["channel_idx"] for call in mc.commands.set_channel.await_args_list] == [
0,
1,
1,
]
assert [call.kwargs["chan"] for call in mc.commands.send_chan_msg.await_args_list] == [
0,
1,
0,
1,
]
assert radio_manager.get_cached_channel_slot(chan_key_a) == 0
assert radio_manager.get_cached_channel_slot(chan_key_b) is None
assert radio_manager.get_cached_channel_slot(chan_key_c) == 1
@pytest.mark.asyncio
async def test_send_channel_msg_tcp_always_reconfigures_slot(self, test_db):
mc = _make_mc(name="MyNode")
chan_key = "d1" * 16
await ChannelRepository.upsert(key=chan_key, name="#tcp")
radio_manager.max_channels = 4
radio_manager._connection_info = "TCP: 127.0.0.1:4000"
with (
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.decoder.calculate_channel_hash", return_value="abcd"),
patch("app.routers.messages.broadcast_event"),
):
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key, text="first send")
)
await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key, text="second send")
)
assert mc.commands.set_channel.await_count == 2
assert mc.commands.send_chan_msg.await_count == 2
assert radio_manager.get_cached_channel_slot(chan_key) is None
class TestResendChannelMessage:
"""Test the user-triggered resend endpoint."""