Monkeypatch the meshcore_py lib for frame-start handling

This commit is contained in:
Jack Kingsman
2026-03-21 22:46:59 -07:00
parent 1e21644d74
commit 9de4158a6c
2 changed files with 109 additions and 1 deletions

View File

@@ -5,10 +5,84 @@ from unittest.mock import AsyncMock, patch
import pytest
from app.main import app, lifespan
from app.main import _install_meshcore_serial_junk_prefix_patch, app, lifespan
class TestStartupLifespan:
@pytest.mark.asyncio
async def test_meshcore_serial_patch_discards_leading_junk_before_frame(self):
class RecordingReader:
def __init__(self):
self.frames = []
async def handle_rx(self, data):
self.frames.append(bytes(data))
class FakeSerialConnection:
def __init__(self):
self.header = b""
self.reader = None
self.frame_expected_size = 0
self.inframe = b""
def set_reader(self, reader):
self.reader = reader
def handle_rx(self, data: bytearray):
if len(self.header) == 0:
idx = data.find(b"\x3e")
if idx < 0:
return
self.header = data[0:1]
data = data[1:]
if len(self.header) < 3:
while len(self.header) < 3 and len(data) > 0:
self.header = self.header + data[0:1]
data = data[1:]
if len(self.header) < 3:
return
self.frame_expected_size = int.from_bytes(
self.header[1:], "little", signed=False
)
if self.frame_expected_size > 300:
self.header = b""
self.inframe = b""
self.frame_expected_size = 0
if len(data) > 0:
self.handle_rx(data)
return
upbound = self.frame_expected_size - len(self.inframe)
if len(data) < upbound:
self.inframe = self.inframe + data
return
self.inframe = self.inframe + data[0:upbound]
data = data[upbound:]
if self.reader is not None:
asyncio.create_task(self.reader.handle_rx(self.inframe))
self.inframe = b""
self.header = b""
self.frame_expected_size = 0
if len(data) > 0:
self.handle_rx(data)
_install_meshcore_serial_junk_prefix_patch(FakeSerialConnection)
conn = FakeSerialConnection()
reader = RecordingReader()
conn.set_reader(reader)
payload = b"\x00\x01\x02\x53"
frame = b"\x3e" + len(payload).to_bytes(2, "little") + payload
conn.handle_rx(b"junk bytes\r\n" + frame)
await asyncio.sleep(0)
assert reader.frames == [payload]
@pytest.mark.asyncio
async def test_lifespan_does_not_wait_for_radio_setup(self):
"""HTTP serving should start before post-connect setup finishes."""