Enhance Meshcore Companion Protocol and Documentation

- Added structured decoding for companion contact records, self info, device info, and new adverts in MeshcoreCompanionProtocol.
- Improved logging for companion events and updated health monitoring.
- Updated README and configuration documentation to reflect new companion settings and structured event logging.
- Enhanced terminal command center to display companion-specific settings only when applicable.
- Added tests for new companion protocol features and ensured proper handling of internal-only events.
This commit is contained in:
sh4un
2026-05-19 21:52:38 -04:00
parent 25b46d9555
commit ed617be1db
12 changed files with 641 additions and 62 deletions
+4
View File
@@ -14,6 +14,10 @@
- **Added** `channel_index` key to `external_message` dict
- **Added** Meshtastic ID to messages bridged to MeshCore
- **Added** contact packet types to protocol handler
- **Added** structured decoding for companion contact records, self info, device info, and new adverts.
- **Improved** companion event visibility in sync logs and the command center log tail.
- **Updated** the terminal command center configuration panel to show companion-only settings only when `companion_radio` is active, including `COMPANION_DEBUG`.
- **Updated** configuration, usage, README, and architecture documentation for the companion metadata flow.
---
## Version 2.0.0 - Comprehensive Code Review and Enhancements (December 31, 2025)
+1
View File
@@ -52,6 +52,7 @@ Copy `examples/config.ini.example` to `config.ini` and edit it.
- `COMPANION_DEBUG = False` (enable raw byte logging)
- `SERIAL_AUTO_SWITCH = True` (auto-switch between `json_newline` and `raw_serial` on repeated decode failures)
- `MESHTASTIC_CHANNEL_INDEX = 1` and `MESHCORE_CHANNEL_INDEX = 2` e.g. only bridge messages from Meshtastic channel index 1 to/from MeshCore channel index 2
- Companion device info, self info, contact sync, and adverts are decoded into structured events and surfaced in the sync logs and the terminal command center log tail
- **For MQTT:**
Set `EXTERNAL_TRANSPORT = mqtt` and configure broker details. Optionally enable TLS/SSL for secure connections.
+153 -4
View File
@@ -269,6 +269,152 @@ class MeshcoreHandler:
except Exception as e:
self.logger.warning("Failed to send CMD_SYNC_NEXT_MESSAGE: %s", e)
@staticmethod
def _short_companion_id(identifier: Optional[str]) -> str:
if not identifier:
return "unknown"
return identifier[:8]
def _format_companion_event(self, decoded_msg: Dict[str, Any]) -> str:
kind = decoded_msg.get("companion_kind") or "unknown"
if kind == "ok":
return "Companion command acknowledged."
if kind == "err":
return (
"Companion error response received"
f" (code={decoded_msg.get('error_code')})."
)
if kind == "sent":
return (
"Companion queued outbound message"
f" (ack={decoded_msg.get('ack_tag')}, timeout={decoded_msg.get('timeout_ms')}ms)."
)
if kind == "msg_waiting":
return "Companion reports queued messages waiting."
if kind == "no_more_messages":
return "Companion message queue drained."
if kind == "contact_start":
return (
"Companion contact sync started"
f" ({decoded_msg.get('contact_count', 'unknown')} contacts)."
)
if kind == "contact_info":
contact = decoded_msg.get("contact") or {}
label = contact.get("adv_name") or self._short_companion_id(
contact.get("public_key")
)
return (
"Companion contact discovered: "
f"{label} [{self._short_companion_id(contact.get('public_key'))}]."
)
if kind == "contact_end":
return (
"Companion contact sync finished"
f" (lastmod={decoded_msg.get('lastmod')})."
)
if kind == "self_info":
info = decoded_msg.get("self_info") or {}
label = info.get("name") or self._short_companion_id(
info.get("public_key")
)
radio_freq = info.get("radio_freq")
if isinstance(radio_freq, (int, float)):
return f"Companion self info: {label} @ {radio_freq:.3f} MHz."
return f"Companion self info: {label}."
if kind == "device_info":
info = decoded_msg.get("device_info") or {}
model = info.get("model") or "MeshCore device"
version = info.get("ver")
if version:
version_label = version
else:
fw_ver = info.get("fw_ver")
version_label = f"fw {fw_ver}" if fw_ver is not None else "firmware unknown"
capacity = []
if info.get("max_contacts") is not None:
capacity.append(f"{info['max_contacts']} contacts")
if info.get("max_channels") is not None:
capacity.append(f"{info['max_channels']} channels")
suffix = f" ({', '.join(capacity)})" if capacity else ""
return f"Companion device info: {model} {version_label}{suffix}."
if kind == "advert":
return (
"Companion advert received from "
f"{self._short_companion_id(decoded_msg.get('pubkey'))}."
)
if kind == "new_advert":
advert = decoded_msg.get("advert") or {}
label = advert.get("adv_name") or self._short_companion_id(
advert.get("public_key")
)
return (
"Companion advert discovered: "
f"{label} [{self._short_companion_id(advert.get('public_key'))}]."
)
if kind == "send_confirmed":
return (
"Companion send confirmed"
f" (ack={decoded_msg.get('ack_code')}, rtt={decoded_msg.get('round_trip_ms')}ms)."
)
if kind == "log_data":
return "Companion log data received."
return f"Companion event: {kind}."
def _update_companion_health(self, decoded_msg: Dict[str, Any]) -> None:
if self.health_monitor.get_component_health("external") is None:
return
kind = decoded_msg.get("companion_kind")
if kind == "self_info":
info = decoded_msg.get("self_info") or {}
label = info.get("name") or self._short_companion_id(
info.get("public_key")
)
details = {
"companion_name": info.get("name"),
"companion_public_key": info.get("public_key"),
"companion_radio_freq_mhz": info.get("radio_freq"),
"companion_radio_bw_khz": info.get("radio_bw"),
}
self.health_monitor.update_component(
"external",
HealthStatus.HEALTHY,
f"Companion identity: {label}",
details={
key: value
for key, value in details.items()
if value not in (None, "")
},
)
elif kind == "device_info":
info = decoded_msg.get("device_info") or {}
model = info.get("model") or "MeshCore device"
version = info.get("ver")
if not version and info.get("fw_ver") is not None:
version = f"fw {info['fw_ver']}"
details = {
"companion_model": info.get("model"),
"companion_version": info.get("ver"),
"companion_fw_ver": info.get("fw_ver"),
"companion_max_contacts": info.get("max_contacts"),
"companion_max_channels": info.get("max_channels"),
}
message = f"Companion device: {model}"
if version:
message = f"{message} {version}"
self.health_monitor.update_component(
"external",
HealthStatus.HEALTHY,
message,
details={
key: value
for key, value in details.items()
if value not in (None, "")
},
)
def _contacts_poll_loop(self) -> None:
"""Periodically request contacts from MeshCore to surface adverts."""
self.logger.info(
@@ -372,8 +518,9 @@ class MeshcoreHandler:
if decoded_msg.get("internal_only"):
kind = decoded_msg.get("companion_kind")
self.logger.info(
"Companion event: %s", kind
self._format_companion_event(decoded_msg)
)
self._update_companion_health(decoded_msg)
# Handle message polling
if kind == "msg_waiting":
if not self._companion_msg_polling:
@@ -627,10 +774,12 @@ class MeshcoreHandler:
# CMD_SEND_CHANNEL_TXT_MSG (3)
txt_type = 0
channel_idx = int(item.get("channel_index", 0))
sender_meshtastic_id = item.get("sender_meshtastic_id", "Unknown")
sender_meshtastic_id = item.get("sender_meshtastic_id")
# Prepend Meshtastic ID to outgoing MeshCore message
payload = f"{sender_meshtastic_id}: {payload}"
# Preserve the raw text when the upstream message does not carry a
# sender ID; only prepend resolved Meshtastic IDs.
if isinstance(sender_meshtastic_id, str) and sender_meshtastic_id.strip():
payload = f"{sender_meshtastic_id}: {payload}"
if self.config.meshtastic_channel_index is not None and self.config.meshcore_channel_index is not None:
if channel_idx == self.config.meshtastic_channel_index:
+133 -28
View File
@@ -166,8 +166,6 @@ class RawSerialProtocol(MeshcoreProtocolHandler):
# --- Meshcore Companion Radio Protocol Handler ---
import struct
class MeshcoreCompanionProtocol(MeshcoreProtocolHandler):
"""
Handles the Meshcore Companion Radio Protocol (USB framing).
@@ -242,6 +240,120 @@ class MeshcoreCompanionProtocol(MeshcoreProtocolHandler):
self.logger.error("Error encoding companion frame: %s", e)
return None
@staticmethod
def _decode_companion_string(raw_data: bytes) -> str:
return raw_data.split(b"\x00", 1)[0].decode("utf-8", "ignore")
def _decode_contact_record(
self,
raw_data: bytes,
*,
key_name: str,
payload_key: str,
) -> Optional[Dict[str, Any]]:
minimum_length = 1 + 32 + 1 + 1 + 1 + 64 + 32 + 4 + 4 + 4 + 4
if len(raw_data) < minimum_length:
self.logger.warning(
"Companion contact record too short to decode: %s",
raw_data.hex(),
)
return None
path_len = int.from_bytes(raw_data[35:36], "little", signed=True)
path_bytes = raw_data[36:100]
effective_path_len = max(0, min(path_len, len(path_bytes)))
record = {
"public_key": raw_data[1:33].hex(),
"type": raw_data[33],
"flags": raw_data[34],
"out_path_len": path_len,
"out_path": path_bytes[:effective_path_len].hex(),
"adv_name": self._decode_companion_string(raw_data[100:132]),
"last_advert": int.from_bytes(raw_data[132:136], "little", signed=False),
"adv_lat": int.from_bytes(raw_data[136:140], "little", signed=True) / 1_000_000,
"adv_lon": int.from_bytes(raw_data[140:144], "little", signed=True) / 1_000_000,
"lastmod": int.from_bytes(raw_data[144:148], "little", signed=False),
}
return {
"companion_kind": key_name,
payload_key: record,
"internal_only": True,
"protocol": "companion_radio",
}
def _decode_self_info(self, raw_data: bytes) -> Optional[Dict[str, Any]]:
minimum_length = 1 + 1 + 1 + 1 + 32 + 4 + 4 + 1 + 1 + 1 + 1 + 4 + 4 + 1 + 1
if len(raw_data) < minimum_length:
self.logger.warning(
"Companion self-info frame too short to decode: %s",
raw_data.hex(),
)
return None
telemetry_mode = raw_data[46]
return {
"companion_kind": "self_info",
"self_info": {
"adv_type": raw_data[1],
"tx_power": raw_data[2],
"max_tx_power": raw_data[3],
"public_key": raw_data[4:36].hex(),
"adv_lat": int.from_bytes(raw_data[36:40], "little", signed=True) / 1_000_000,
"adv_lon": int.from_bytes(raw_data[40:44], "little", signed=True) / 1_000_000,
"multi_acks": raw_data[44],
"adv_loc_policy": raw_data[45],
"telemetry_mode_env": (telemetry_mode >> 4) & 0b11,
"telemetry_mode_loc": (telemetry_mode >> 2) & 0b11,
"telemetry_mode_base": telemetry_mode & 0b11,
"manual_add_contacts": raw_data[47] > 0,
"radio_freq": int.from_bytes(raw_data[48:52], "little", signed=False) / 1000,
"radio_bw": int.from_bytes(raw_data[52:56], "little", signed=False) / 1000,
"radio_sf": raw_data[56],
"radio_cr": raw_data[57],
"name": self._decode_companion_string(raw_data[58:]),
},
"internal_only": True,
"protocol": "companion_radio",
}
def _decode_device_info(self, raw_data: bytes) -> Optional[Dict[str, Any]]:
if len(raw_data) < 2:
self.logger.warning(
"Companion device-info frame too short to decode: %s",
raw_data.hex(),
)
return None
fw_ver = raw_data[1]
device_info = {"fw_ver": fw_ver}
if fw_ver >= 3:
minimum_length = 1 + 1 + 1 + 1 + 4 + 12 + 40 + 20
if len(raw_data) < minimum_length:
self.logger.warning(
"Companion device-info payload too short for fw_ver %d: %s",
fw_ver,
raw_data.hex(),
)
return None
device_info.update(
{
"max_contacts": raw_data[2] * 2,
"max_channels": raw_data[3],
"ble_pin": int.from_bytes(raw_data[4:8], "little", signed=False),
"fw_build": self._decode_companion_string(raw_data[8:20]),
"model": self._decode_companion_string(raw_data[20:60]),
"ver": self._decode_companion_string(raw_data[60:80]),
}
)
return {
"companion_kind": "device_info",
"device_info": device_info,
"internal_only": True,
"protocol": "companion_radio",
}
def decode(self, raw_data: bytes) -> Optional[Dict[str, Any]]:
"""Decode companion payload bytes into a dict.
@@ -398,39 +510,37 @@ class MeshcoreCompanionProtocol(MeshcoreProtocolHandler):
}
if code == 0x02: # CONTACT_START
if len(raw_data) < 5:
return None
return {
"companion_kind": "contact_start",
"contact_count": int.from_bytes(raw_data[1:5], "little", signed=False),
"internal_only": True,
"protocol": "companion_radio",
}
if code == 0x03: # CONTACT_INFO
return {
"companion_kind": "contact_info",
"internal_only": True,
"protocol": "companion_radio",
}
return self._decode_contact_record(
raw_data,
key_name="contact_info",
payload_key="contact",
)
if code == 0x04: # CONTACT_END
if len(raw_data) < 5:
return None
return {
"companion_kind": "contact_end",
"lastmod": int.from_bytes(raw_data[1:5], "little", signed=False),
"internal_only": True,
"protocol": "companion_radio",
}
if code == 0x05: # SELF_INFO
return {
"companion_kind": "self_info",
"internal_only": True,
"protocol": "companion_radio",
}
return self._decode_self_info(raw_data)
if code == 0x0D: # DEVICE_INFO
return {
"companion_kind": "device_info",
"internal_only": True,
"protocol": "companion_radio",
}
return self._decode_device_info(raw_data)
# --- PUSH codes (unsolicited events from the radio) ---
@@ -438,7 +548,6 @@ class MeshcoreCompanionProtocol(MeshcoreProtocolHandler):
if len(raw_data) < 1 + 32:
return None
pubkey = raw_data[1:33]
self.logger.info("MeshCore advert from: %s", pubkey[:4].hex())
return {
"companion_kind": "advert",
"pubkey": pubkey.hex(),
@@ -474,16 +583,12 @@ class MeshcoreCompanionProtocol(MeshcoreProtocolHandler):
}
if code == 0x8A: # PUSH_CODE_NEW_ADVERT
if len(raw_data) < 1 + 32:
return None
pubkey = raw_data[1:33]
self.logger.info("MeshCore new advert from: %s", pubkey[:4].hex())
return {
"companion_kind": "new_advert",
"pubkey": pubkey.hex(),
"internal_only": True,
"protocol": "companion_radio",
}
decoded = self._decode_contact_record(
raw_data,
key_name="new_advert",
payload_key="advert",
)
return decoded
# Ignore non-message frames
self.logger.debug("Ignoring companion frame code: 0x%02x", code)
+19 -9
View File
@@ -444,6 +444,7 @@ def format_timestamp(value: object) -> str:
def build_config_rows(config: BridgeConfig) -> list[tuple[str, str]]:
"""Build the configuration summary shown in the side panel."""
serial_protocol = config.serial_protocol or "not configured"
rows = [
("Meshtastic Port", config.meshtastic_port or "disabled"),
("External Transport", config.external_transport.upper()),
@@ -461,19 +462,28 @@ def build_config_rows(config: BridgeConfig) -> list[tuple[str, str]]:
("Serial Baud", str(config.serial_baud or "--")),
(
"Serial Protocol",
(config.serial_protocol or "not configured").upper(),
),
(
"Companion Handshake",
_bool_label(config.companion_handshake_enabled),
),
(
"Companion Poll",
f"{config.companion_contacts_poll_s or 0}s",
serial_protocol.upper(),
),
("Auto Switch", _bool_label(config.serial_auto_switch)),
]
)
if serial_protocol.lower() == "companion_radio":
rows.extend(
[
(
"Companion Handshake",
_bool_label(config.companion_handshake_enabled),
),
(
"Companion Poll",
f"{config.companion_contacts_poll_s or 0}s",
),
(
"Companion Debug",
_bool_label(config.companion_debug),
),
]
)
else:
rows.extend(
[
+3 -2
View File
@@ -1,6 +1,6 @@
# Architecture Documentation
**Last Updated: May 18, 2026**
**Last Updated: May 19, 2026**
This document describes the architecture and design of the Akita Meshtastic Meshcore Bridge (AMMB).
@@ -60,7 +60,8 @@ The command center is implemented in `ammb/tui.py`. It launches the synchronous
5. **Protocol Handlers** (`ammb/protocol.py`)
- Abstract base class for serial protocols
- Implementations: `JsonNewlineProtocol`, `RawSerialProtocol`
- Implementations: `JsonNewlineProtocol`, `RawSerialProtocol`, `MeshcoreCompanionProtocol`
- `MeshcoreCompanionProtocol` decodes both user messages and MeshCore management frames such as contact sync records, self info, device info, and adverts into structured bridge events
- Extensible for custom protocols
6. **Configuration Handler** (`ammb/config_handler.py`)
+25 -3
View File
@@ -1,6 +1,6 @@
# Configuration Guide
**Last Updated: April 21, 2026**
**Last Updated: May 19, 2026**
The Akita Meshtastic Meshcore Bridge (AMMB) uses a configuration file named `config.ini` located in the project's root directory. Copy `examples/config.ini.example` to `config.ini` and modify it according to your setup.
@@ -51,8 +51,8 @@ These settings are only used when `EXTERNAL_TRANSPORT = serial`.
* **Description:** Specifies how messages are formatted over the serial connection.
* **Supported Values:**
* `json_newline`: Messages are newline-terminated UTF-8 JSON strings (default for structured data)
* `raw_serial`: Raw binary/text bytes forwarded as hex (for MeshCore Companion Mode)
* `companion_radio`: MeshCore Companion USB framing protocol (for MeshCore radios)
* `raw_serial`: Raw binary/text bytes forwarded as hex (useful for low-level passthrough or diagnostics)
* `companion_radio`: MeshCore Companion USB framing protocol with structured decoding for message, contact, self-info, device-info, and advert frames
* **Required:** Yes (when using serial transport)
* **Default:** `json_newline`
@@ -62,6 +62,28 @@ These settings are only used when `EXTERNAL_TRANSPORT = serial`.
* **Required:** No
* **Default:** `True`
### MeshCore Companion Settings
These settings only apply when `EXTERNAL_TRANSPORT = serial` and `SERIAL_PROTOCOL = companion_radio`.
* **`COMPANION_HANDSHAKE_ENABLED`**
* **Description:** Send MeshCore companion startup commands after the serial port opens so the bridge can query device capabilities and begin message polling.
* **Values:** `True`, `False`
* **Required:** No
* **Default:** `True`
* **`COMPANION_CONTACTS_POLL_S`**
* **Description:** Periodically request the MeshCore contact book to surface adverts and contact metadata in the bridge logs and dashboard log tail. Set to `0` to disable polling.
* **Range:** `0` and above (seconds)
* **Required:** No
* **Default:** `0`
* **`COMPANION_DEBUG`**
* **Description:** Enable verbose raw companion frame logging for troubleshooting USB framing and decode problems.
* **Values:** `True`, `False`
* **Required:** No
* **Default:** `False`
### MQTT Transport Settings
These settings are only used when `EXTERNAL_TRANSPORT = mqtt`.
+4 -2
View File
@@ -1,6 +1,6 @@
# Usage Guide
**Last Updated: May 18, 2026**
**Last Updated: May 19, 2026**
This guide explains how to run and interact with the Akita Meshtastic Meshcore Bridge (AMMB).
@@ -67,6 +67,7 @@ Notes:
* The bridge will attempt to connect to both the Meshtastic and external devices based on your `config.ini`
* If a connection fails initially (e.g., device not plugged in), it will log a warning or error and periodically retry in the background
* Once running, it will log messages received and sent on both networks (depending on your `LOG_LEVEL`)
* When using `SERIAL_PROTOCOL = companion_radio`, the logs also show structured MeshCore control events such as self info, device info, contact sync progress, and new adverts
### Full-Screen Terminal Command Center
@@ -113,6 +114,7 @@ If you are running `python run_bridge_tui.py`, the command center becomes the pr
* queue depth and message counters
* recent warnings and errors in the event feed
* the live log tail without leaving the dashboard
* companion protocol metadata such as MeshCore identity, device capabilities, and contact/advert discovery when `companion_radio` is enabled
### Log Level
@@ -190,7 +192,7 @@ Messages from Meshtastic are translated to the following format:
"timestamp_rx": 1704067200.0,
"rx_rssi": -85,
"rx_snr": 5.2,
"channel_index": 0
"channel_index": 0
}
```
+44 -4
View File
@@ -715,6 +715,42 @@ class TestMTtoMC_Encoding:
text = cmd_payload[7:].decode("utf-8")
assert text == "Hello MeshCore!"
@patch("ammb.meshcore_handler.serial.Serial")
def test_meshtastic_sender_id_is_prefixed_when_present(self, mock_serial_cls):
from ammb.meshcore_handler import MeshcoreHandler
to_mesh_q = Queue(maxsize=10)
from_mesh_q = Queue(maxsize=10)
shutdown = threading.Event()
config = _make_bridge_config()
mock_port = MagicMock()
mock_port.is_open = True
mock_serial_cls.return_value = mock_port
handler = MeshcoreHandler(config, to_mesh_q, from_mesh_q, shutdown)
handler.connect()
mock_port.write.reset_mock()
from_mesh_q.put(
{
"type": "meshtastic_message",
"payload": "Hello MeshCore!",
"channel_index": 0,
"sender_meshtastic_id": "!deadbeef",
}
)
shutdown_timer = threading.Timer(0.6, shutdown.set)
shutdown_timer.start()
handler._serial_sender_loop()
written = mock_port.write.call_args_list[-1].args[0]
length = written[1] | (written[2] << 8)
cmd_payload = written[3 : 3 + length]
assert cmd_payload[7:].decode("utf-8") == "!deadbeef: Hello MeshCore!"
@patch("ammb.meshcore_handler.serial.Serial")
def test_non_text_meshtastic_msg_skipped(self, mock_serial_cls):
"""Non-text Meshtastic messages should not be encoded."""
@@ -875,10 +911,12 @@ class TestProtocolRoundTrips:
assert decoded["companion_kind"] == "advert"
assert decoded["pubkey"].startswith("abab")
def test_log_rx_data_returns_none(self):
def test_log_rx_data_decoded_as_internal(self):
proto = MeshcoreCompanionProtocol()
decoded = proto.decode(_log_rx_data_payload())
assert decoded is None, "0x88 LOG_RX_DATA should return None"
assert decoded is not None
assert decoded["companion_kind"] == "log_data"
assert decoded["internal_only"] is True
def test_framing_write_read_symmetry(self):
"""Write an inbound frame, flip to outbound header, read it back."""
@@ -913,14 +951,16 @@ class TestReproduceOriginalBugs:
def test_0x88_no_longer_misidentified_as_channel_msg(self):
"""The user's log showed 0x88 (PUSH_CODE_LOG_RX_DATA) being decoded
as channel_index=49 (the SNR byte), which the validator rejected.
Verify it is now safely ignored."""
Verify it is now treated as an internal-only event instead."""
proto = MeshcoreCompanionProtocol()
# Simulate the kind of payload the radio sends for LOG_RX_DATA
# [0x88][SNR=0x31][...data...]
raw = bytes([0x88, 0x31, 0x00, 0x00, 0x01, 0x00, 0x00]) + b"Test log data"
decoded = proto.decode(raw)
assert decoded is None
assert decoded is not None
assert decoded["companion_kind"] == "log_data"
assert decoded["internal_only"] is True
def test_advert_no_longer_forwarded_as_text(self):
"""The user saw 'MC_ADVERT:db46 --------' on Meshtastic chat.
+56 -6
View File
@@ -261,12 +261,14 @@ class TestMeshcoreCompanionProtocol:
assert result["snr"] == 5
def test_decode_push_channel_msg(self, handler):
"""0x88 is PUSH_CODE_LOG_RX_DATA (not a channel msg) and should be ignored."""
"""0x88 is PUSH_CODE_LOG_RX_DATA and should stay internal-only."""
ts = 4000
ts_bytes = ts.to_bytes(4, "little")
raw = bytes([0x88, 1, 0, 0]) + ts_bytes + b"push channel"
result = handler.decode(raw)
assert result is None # 0x88 is not a channel message
assert result is not None
assert result["companion_kind"] == "log_data"
assert result["internal_only"] is True
def test_decode_push_contact_msg(self, handler):
"""0x87 is PUSH_CODE_STATUS_RESPONSE (not a contact msg) and should be ignored."""
@@ -340,12 +342,25 @@ class TestMeshcoreCompanionProtocol:
def test_decode_new_advert(self, handler):
pubkey = bytes(range(32))
raw = bytes([0x8A]) + pubkey
path = bytes.fromhex("aabb") + (b"\x00" * 62)
adv_name = b"Field Unit" + (b"\x00" * 22)
raw = (
bytes([0x8A])
+ pubkey
+ bytes([1, 0, 2])
+ path
+ adv_name
+ (77).to_bytes(4, "little")
+ int(34.123456 * 1_000_000).to_bytes(4, "little", signed=True)
+ int(-117.123456 * 1_000_000).to_bytes(4, "little", signed=True)
+ (78).to_bytes(4, "little")
)
result = handler.decode(raw)
assert result is not None
assert result["companion_kind"] == "new_advert"
assert result["internal_only"] is True
assert result["pubkey"] == pubkey.hex()
assert result["advert"]["public_key"] == pubkey.hex()
assert result["advert"]["adv_name"] == "Field Unit"
assert "payload" not in result
def test_decode_unknown_frame_returns_none(self, handler):
@@ -378,10 +393,12 @@ class TestMeshcoreCompanionProtocol:
assert result["internal_only"] is True
def test_decode_log_rx_data_ignored(self, handler):
"""PUSH_CODE_LOG_RX_DATA (0x88) should be ignored (return None)."""
"""PUSH_CODE_LOG_RX_DATA (0x88) should be decoded as internal-only."""
raw = bytes([0x88]) + b"\x00" * 50
result = handler.decode(raw)
assert result is None
assert result is not None
assert result["companion_kind"] == "log_data"
assert result["internal_only"] is True
# =========================================================================
@@ -745,6 +762,39 @@ class TestMeshcoreHandler:
handler._switch_protocol()
assert handler._protocol_name == original_protocol # unchanged
def test_format_companion_event_summaries(self):
from ammb.meshcore_handler import MeshcoreHandler
config = _make_bridge_config(serial_protocol="companion_radio")
handler = MeshcoreHandler(config, Queue(), Queue(), threading.Event())
contact_summary = handler._format_companion_event(
{
"companion_kind": "contact_info",
"contact": {
"adv_name": "Desk Node",
"public_key": "0011223344556677",
},
}
)
device_summary = handler._format_companion_event(
{
"companion_kind": "device_info",
"device_info": {
"model": "Akita MeshCore Board",
"ver": "2.1.1",
"max_contacts": 32,
"max_channels": 8,
},
}
)
assert contact_summary == "Companion contact discovered: Desk Node [00112233]."
assert device_summary == (
"Companion device info: Akita MeshCore Board 2.1.1 "
"(32 contacts, 8 channels)."
)
@patch("ammb.meshcore_handler.serial.Serial")
def test_close_serial(self, mock_serial_cls, handler_parts):
config, to_mesh_q, from_mesh_q, shutdown = handler_parts
+157
View File
@@ -8,6 +8,7 @@ import pytest
# Module to test
from ammb.protocol import (
JsonNewlineProtocol,
MeshcoreCompanionProtocol,
get_serial_protocol_handler,
)
@@ -20,6 +21,12 @@ def json_handler() -> JsonNewlineProtocol:
return JsonNewlineProtocol()
@pytest.fixture
def companion_handler() -> MeshcoreCompanionProtocol:
"""Provides an instance of the MeshcoreCompanionProtocol handler."""
return MeshcoreCompanionProtocol()
# Parameterize test data for encoding
encode_test_data = [
({"key": "value", "num": 123}, b'{"key": "value", "num": 123}\n'),
@@ -103,5 +110,155 @@ def test_get_protocol_handler_unsupported():
get_serial_protocol_handler("unknown_protocol")
def test_companion_decode_contact_book_frames(
companion_handler: MeshcoreCompanionProtocol,
):
pubkey = bytes(range(32))
out_path = bytes.fromhex("010203") + (b"\x00" * 61)
adv_name = b"Desk Node" + (b"\x00" * 23)
contact_info = (
bytes([0x03])
+ pubkey
+ bytes([2, 7, 3])
+ out_path
+ adv_name
+ (123456).to_bytes(4, "little")
+ int(51.501234 * 1_000_000).to_bytes(4, "little", signed=True)
+ int(-0.141234 * 1_000_000).to_bytes(4, "little", signed=True)
+ (654321).to_bytes(4, "little")
)
assert companion_handler.decode(bytes([0x02]) + (4).to_bytes(4, "little")) == {
"companion_kind": "contact_start",
"contact_count": 4,
"internal_only": True,
"protocol": "companion_radio",
}
assert companion_handler.decode(contact_info) == {
"companion_kind": "contact_info",
"contact": {
"public_key": pubkey.hex(),
"type": 2,
"flags": 7,
"out_path_len": 3,
"out_path": "010203",
"adv_name": "Desk Node",
"last_advert": 123456,
"adv_lat": pytest.approx(51.501234),
"adv_lon": pytest.approx(-0.141234),
"lastmod": 654321,
},
"internal_only": True,
"protocol": "companion_radio",
}
assert companion_handler.decode(bytes([0x04]) + (99).to_bytes(4, "little")) == {
"companion_kind": "contact_end",
"lastmod": 99,
"internal_only": True,
"protocol": "companion_radio",
}
def test_companion_decode_self_info(companion_handler: MeshcoreCompanionProtocol):
public_key = bytes(range(31, -1, -1))
payload = (
bytes([0x05, 3, 10, 20])
+ public_key
+ int(40.7128 * 1_000_000).to_bytes(4, "little", signed=True)
+ int(-74.0060 * 1_000_000).to_bytes(4, "little", signed=True)
+ bytes([1, 2, 0b011001, 1])
+ (915_000).to_bytes(4, "little")
+ (250_000).to_bytes(4, "little")
+ bytes([11, 5])
+ b"Bridge Node"
)
assert companion_handler.decode(payload) == {
"companion_kind": "self_info",
"self_info": {
"adv_type": 3,
"tx_power": 10,
"max_tx_power": 20,
"public_key": public_key.hex(),
"adv_lat": pytest.approx(40.7128),
"adv_lon": pytest.approx(-74.006),
"multi_acks": 1,
"adv_loc_policy": 2,
"telemetry_mode_env": 1,
"telemetry_mode_loc": 2,
"telemetry_mode_base": 1,
"manual_add_contacts": True,
"radio_freq": 915.0,
"radio_bw": 250.0,
"radio_sf": 11,
"radio_cr": 5,
"name": "Bridge Node",
},
"internal_only": True,
"protocol": "companion_radio",
}
def test_companion_decode_device_info_and_new_advert(
companion_handler: MeshcoreCompanionProtocol,
):
advert_pubkey = bytes.fromhex("aa" * 32)
advert_path = bytes.fromhex("beef") + (b"\x00" * 62)
advert_name = b"Field Unit" + (b"\x00" * 22)
advert_payload = (
bytes([0x8A])
+ advert_pubkey
+ bytes([1, 0, 2])
+ advert_path
+ advert_name
+ (77).to_bytes(4, "little")
+ int(34.123456 * 1_000_000).to_bytes(4, "little", signed=True)
+ int(-117.123456 * 1_000_000).to_bytes(4, "little", signed=True)
+ (78).to_bytes(4, "little")
)
device_payload = (
bytes([0x0D, 3, 16, 8])
+ (123456).to_bytes(4, "little")
+ b"20260519abcd"
+ b"Akita MeshCore Board".ljust(40, b"\x00")
+ b"2.1.1".ljust(20, b"\x00")
)
assert companion_handler.decode(advert_payload) == {
"companion_kind": "new_advert",
"advert": {
"public_key": advert_pubkey.hex(),
"type": 1,
"flags": 0,
"out_path_len": 2,
"out_path": "beef",
"adv_name": "Field Unit",
"last_advert": 77,
"adv_lat": pytest.approx(34.123456),
"adv_lon": pytest.approx(-117.123456),
"lastmod": 78,
},
"internal_only": True,
"protocol": "companion_radio",
}
assert companion_handler.decode(device_payload) == {
"companion_kind": "device_info",
"device_info": {
"fw_ver": 3,
"max_contacts": 32,
"max_channels": 8,
"ble_pin": 123456,
"fw_build": "20260519abcd",
"model": "Akita MeshCore Board",
"ver": "2.1.1",
},
"internal_only": True,
"protocol": "companion_radio",
}
# Add tests for other protocol handlers (e.g., PlainTextProtocol)
# when implemented.
+42 -4
View File
@@ -93,6 +93,26 @@ def test_build_config_rows_redacts_mqtt_password():
assert rows["MQTT Broker"] == "broker.example:1883"
def test_build_config_rows_only_show_companion_fields_for_companion_protocol():
standard_rows = dict(build_config_rows(make_config(serial_protocol="json_newline")))
companion_rows = dict(
build_config_rows(
make_config(
serial_protocol="companion_radio",
companion_contacts_poll_s=30,
companion_debug=True,
)
)
)
assert "Companion Handshake" not in standard_rows
assert "Companion Poll" not in standard_rows
assert "Companion Debug" not in standard_rows
assert companion_rows["Companion Handshake"] == "enabled"
assert companion_rows["Companion Poll"] == "30s"
assert companion_rows["Companion Debug"] == "enabled"
def test_bridge_controller_start_and_stop():
controller = BridgeController(make_config(), bridge_factory=FakeBridge)
@@ -126,8 +146,7 @@ def test_bridge_controller_reports_init_failure():
assert controller.snapshot().state == "error"
@pytest.mark.asyncio
async def test_bridge_dashboard_app_mounts():
def test_bridge_dashboard_app_mounts(monkeypatch):
config = make_config()
store = DashboardStore()
controller = BridgeController(
@@ -141,7 +160,26 @@ async def test_bridge_dashboard_app_mounts():
store=store,
)
async with app.run_test(size=(160, 48)) as pilot:
await pilot.pause()
refresh_calls = []
interval_calls = []
monkeypatch.setattr(
app,
"refresh_dashboard",
lambda: refresh_calls.append("refresh"),
)
monkeypatch.setattr(
app,
"set_interval",
lambda interval, callback: interval_calls.append((interval, callback)),
)
app.on_mount()
assert app.title == "Akita Mesh Bridge Command Center"
assert app.sub_title == config.external_transport.upper()
assert refresh_calls == ["refresh"]
assert interval_calls == [(1.0, app.refresh_dashboard)]
assert controller.snapshot().state == "running"
controller.stop(wait=True)