feat: enhance companion radio support with new configuration options and message handling

This commit is contained in:
sh4un
2026-02-04 11:14:45 -05:00
parent 12bb53aa64
commit 098910e320
3 changed files with 319 additions and 12 deletions

View File

@@ -50,6 +50,11 @@ class BridgeConfig(NamedTuple):
mqtt_tls_ca_certs: Optional[str] = None
mqtt_tls_insecure: Optional[bool] = False
# Companion (Meshcore) Settings (Optional)
companion_handshake_enabled: Optional[bool] = True
companion_contacts_poll_s: Optional[int] = 0
companion_debug: Optional[bool] = False
CONFIG_FILE = "config.ini"
@@ -78,6 +83,9 @@ DEFAULT_CONFIG = {
"MQTT_TLS_ENABLED": "False",
"MQTT_TLS_CA_CERTS": "",
"MQTT_TLS_INSECURE": "False",
"COMPANION_HANDSHAKE_ENABLED": "True",
"COMPANION_CONTACTS_POLL_S": "0",
"COMPANION_DEBUG": "False",
}
VALID_LOG_LEVELS = {"CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"}
@@ -266,6 +274,22 @@ def load_config(config_path: str = CONFIG_FILE) -> Optional[BridgeConfig]:
"MQTT_TLS_INSECURE", fallback=False
)
# Companion settings
companion_handshake_enabled = cfg_section.getboolean(
"COMPANION_HANDSHAKE_ENABLED", fallback=True
)
try:
companion_contacts_poll_s = cfg_section.getint(
"COMPANION_CONTACTS_POLL_S", fallback=0
)
if companion_contacts_poll_s is None or companion_contacts_poll_s < 0:
companion_contacts_poll_s = 0
except ValueError:
companion_contacts_poll_s = 0
companion_debug = cfg_section.getboolean(
"COMPANION_DEBUG", fallback=False
)
bridge_config = BridgeConfig(
meshtastic_port=meshtastic_port,
external_transport=cast(
@@ -293,6 +317,9 @@ def load_config(config_path: str = CONFIG_FILE) -> Optional[BridgeConfig]:
mqtt_tls_enabled=mqtt_tls_enabled,
mqtt_tls_ca_certs=mqtt_tls_ca_certs,
mqtt_tls_insecure=mqtt_tls_insecure,
companion_handshake_enabled=companion_handshake_enabled,
companion_contacts_poll_s=companion_contacts_poll_s,
companion_debug=companion_debug,
)
logger.debug("Configuration loaded: %s", bridge_config)
return bridge_config

View File

@@ -51,6 +51,21 @@ class MeshcoreHandler:
)
self.sender_thread.start()
if (
self._protocol_name == "companion_radio"
and self._companion_contacts_poll_s > 0
):
if self._contacts_poll_thread and self._contacts_poll_thread.is_alive():
self.logger.warning("Companion contacts poll thread already started.")
else:
self.logger.info("Starting Companion contacts poll thread...")
self._contacts_poll_thread = threading.Thread(
target=self._contacts_poll_loop,
daemon=True,
name="CompanionContactsPoll",
)
self._contacts_poll_thread.start()
RECONNECT_DELAY_S = 10
AUTO_DETECT_FAILURE_THRESHOLD = 5
@@ -71,6 +86,7 @@ class MeshcoreHandler:
self.serial_port: Optional[serial.Serial] = None
self.receiver_thread: Optional[threading.Thread] = None
self.sender_thread: Optional[threading.Thread] = None
self._contacts_poll_thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
self._is_connected = threading.Event()
@@ -92,6 +108,15 @@ class MeshcoreHandler:
self._protocol_name = config.serial_protocol.lower()
self.logger.info(f"Selected serial protocol: {self._protocol_name}")
self._companion_handshake_enabled = getattr(
config, "companion_handshake_enabled", True
)
self._companion_contacts_poll_s = int(
getattr(config, "companion_contacts_poll_s", 0) or 0
)
self._companion_debug = bool(
getattr(config, "companion_debug", False)
)
self._failure_count = 0
self._auto_switched = False
self._auto_switch_enabled = getattr(config, "serial_auto_switch", True)
@@ -101,6 +126,8 @@ class MeshcoreHandler:
get_serial_protocol_handler(self._protocol_name)
)
self.logger.info(f"Initialized protocol handler: {type(self.protocol_handler).__name__}")
if self._protocol_name == "companion_radio" and not self._companion_debug:
self.protocol_handler.logger.setLevel(logging.INFO)
except ValueError as e:
self.logger.critical(
"Failed to initialize serial protocol handler '%s': %s.",
@@ -169,6 +196,11 @@ class MeshcoreHandler:
"Serial port %s opened successfully.",
self.config.serial_port,
)
if (
self._protocol_name == "companion_radio"
and self._companion_handshake_enabled
):
self._send_companion_handshake()
self._is_connected.set()
return True
else:
@@ -196,12 +228,58 @@ class MeshcoreHandler:
self._is_connected.clear()
return False
def _send_companion_handshake(self) -> None:
"""Send basic companion protocol handshake commands to elicit responses."""
try:
# CMD_DEVICE_QUERY (22) + app_target_ver (3)
self._send_companion_command(bytes([22, 3]), "CMD_DEVICE_QUERY")
# CMD_APP_START (1) + app_ver (3) + reserved(6) + app_name
app_name = b"AMMB"
payload = bytes([1, 3]) + (b"\x00" * 6) + app_name
self._send_companion_command(payload, "CMD_APP_START")
except Exception as e:
self.logger.warning("Failed to send companion handshake: %s", e)
def _send_companion_command(self, payload: bytes, label: str) -> None:
if not self.serial_port or not self.serial_port.is_open:
return
try:
encoded_message = self.protocol_handler.encode({"payload": payload})
if not encoded_message:
self.logger.warning("Failed to encode companion command: %s", label)
return
self.serial_port.write(encoded_message)
self.serial_port.flush()
self.logger.info("Sent companion command: %s", label)
except Exception as e:
self.logger.warning("Error sending companion command %s: %s", label, e)
def _contacts_poll_loop(self) -> None:
"""Periodically request contacts from MeshCore to surface adverts."""
self.logger.info(
"Companion contacts poll loop started (interval=%ss).",
self._companion_contacts_poll_s,
)
while not self.shutdown_event.is_set():
if not self._is_connected.is_set():
self.shutdown_event.wait(self.RECONNECT_DELAY_S)
continue
try:
# CMD_GET_CONTACTS (4) with no 'since' param
self._send_companion_command(bytes([4]), "CMD_GET_CONTACTS")
except Exception as e:
self.logger.warning("Contacts poll failed: %s", e)
self.shutdown_event.wait(self._companion_contacts_poll_s)
self.logger.info("Companion contacts poll loop stopped.")
def stop(self):
self.logger.info("Stopping Serial handler...")
if self.receiver_thread and self.receiver_thread.is_alive():
self.receiver_thread.join(timeout=2)
if self.sender_thread and self.sender_thread.is_alive():
self.sender_thread.join(timeout=5)
if self._contacts_poll_thread and self._contacts_poll_thread.is_alive():
self._contacts_poll_thread.join(timeout=2)
self._close_serial()
self.logger.info("Serial handler stopped.")
@@ -261,7 +339,8 @@ class MeshcoreHandler:
continue
if raw_data:
self.logger.debug(f"Serial RAW RX: {raw_data!r}")
if self._protocol_name != "companion_radio" or self._companion_debug:
self.logger.debug(f"Serial RAW RX: {raw_data!r}")
self.health_monitor.update_component(
"external", HealthStatus.HEALTHY, "Serial RX received"
)
@@ -271,6 +350,15 @@ class MeshcoreHandler:
self.logger.debug(f"Decoded serial message: {decoded_msg}")
if decoded_msg:
if decoded_msg.get("internal_only"):
self.logger.info(
"Companion event: %s",
decoded_msg.get("companion_kind"),
)
continue
if self._protocol_name == "companion_radio" and "payload" not in decoded_msg:
self.logger.debug("Skipping non-message companion frame.")
continue
is_valid, error_msg = (
self.validator.validate_external_message(
decoded_msg
@@ -392,10 +480,14 @@ class MeshcoreHandler:
)
if not item:
continue
encoded_message: Optional[bytes] = (
self.protocol_handler.encode(item)
)
encoded_message: Optional[bytes] = None
if self._protocol_name == "companion_radio":
encoded_message = self._encode_companion_from_meshtastic(item)
if encoded_message is None:
self.to_serial_queue.task_done()
continue
else:
encoded_message = self.protocol_handler.encode(item)
if encoded_message:
# Truncate log for binary safety
@@ -466,3 +558,33 @@ class MeshcoreHandler:
time.sleep(5)
self.logger.info("Serial sender loop stopped.")
def _encode_companion_from_meshtastic(self, item: Dict[str, Any]) -> Optional[bytes]:
"""Encode Meshtastic-originated messages into MeshCore companion commands."""
msg_type = item.get("type")
if msg_type != "meshtastic_message":
self.logger.debug(
"Skipping non-text Meshtastic message for companion: %s",
msg_type,
)
return None
payload = item.get("payload")
if not isinstance(payload, str):
self.logger.warning(
"Unsupported Meshtastic payload type for companion: %s",
type(payload),
)
return None
# CMD_SEND_CHANNEL_TXT_MSG (3)
txt_type = 0
channel_idx = int(item.get("channel_index", 0))
sender_ts = int(time.time())
text_bytes = payload.encode("utf-8")
cmd_payload = bytes([3, txt_type, channel_idx]) + sender_ts.to_bytes(4, "little") + text_bytes
encoded = self.protocol_handler.encode({"payload": cmd_payload})
if not encoded:
self.logger.warning("Failed to encode companion text message.")
return encoded

View File

@@ -235,15 +235,173 @@ class MeshcoreCompanionProtocol(MeshcoreProtocolHandler):
return None
def decode(self, raw_data: bytes) -> Optional[Dict[str, Any]]:
"""Wraps received payload bytes into a dict."""
"""Decode companion payload bytes into a dict.
Only forwards text-message frames; other frames are ignored.
"""
if not raw_data:
return None
return {
"destination_meshtastic_id": "^all",
"payload": raw_data,
"raw_binary": True,
"protocol": "companion_radio",
}
code = raw_data[0]
# Text message responses
if code == 7: # RESP_CODE_CONTACT_MSG_RECV
if len(raw_data) < 1 + 6 + 1 + 1 + 4:
return None
pubkey_prefix = raw_data[1:7]
path_len = raw_data[7]
txt_type = raw_data[8]
sender_ts = int.from_bytes(raw_data[9:13], "little", signed=False)
text_bytes = raw_data[13:]
text = text_bytes.decode("utf-8", errors="replace")
return {
"destination_meshtastic_id": "^all",
"payload": text,
"channel_index": 0,
"companion_kind": "contact_msg",
"sender_pubkey_prefix": pubkey_prefix.hex(),
"path_len": path_len,
"txt_type": txt_type,
"sender_timestamp": sender_ts,
"protocol": "companion_radio",
}
if code == 16: # RESP_CODE_CONTACT_MSG_RECV_V3
if len(raw_data) < 1 + 1 + 2 + 6 + 1 + 1 + 4:
return None
snr = raw_data[1]
pubkey_prefix = raw_data[4:10]
path_len = raw_data[10]
txt_type = raw_data[11]
sender_ts = int.from_bytes(raw_data[12:16], "little", signed=False)
text_bytes = raw_data[16:]
text = text_bytes.decode("utf-8", errors="replace")
return {
"destination_meshtastic_id": "^all",
"payload": text,
"channel_index": 0,
"companion_kind": "contact_msg",
"sender_pubkey_prefix": pubkey_prefix.hex(),
"path_len": path_len,
"txt_type": txt_type,
"snr": snr,
"sender_timestamp": sender_ts,
"protocol": "companion_radio",
}
if code == 8: # RESP_CODE_CHANNEL_MSG_RECV
if len(raw_data) < 1 + 1 + 1 + 1 + 4:
return None
channel_idx = raw_data[1]
path_len = raw_data[2]
txt_type = raw_data[3]
sender_ts = int.from_bytes(raw_data[4:8], "little", signed=False)
text_bytes = raw_data[8:]
text = text_bytes.decode("utf-8", errors="replace")
return {
"destination_meshtastic_id": "^all",
"payload": text,
"channel_index": channel_idx,
"companion_kind": "channel_msg",
"path_len": path_len,
"txt_type": txt_type,
"sender_timestamp": sender_ts,
"protocol": "companion_radio",
}
if code == 17: # RESP_CODE_CHANNEL_MSG_RECV_V3
if len(raw_data) < 1 + 1 + 2 + 1 + 1 + 1 + 4:
return None
snr = raw_data[1]
channel_idx = raw_data[4]
path_len = raw_data[5]
txt_type = raw_data[6]
sender_ts = int.from_bytes(raw_data[7:11], "little", signed=False)
text_bytes = raw_data[11:]
text = text_bytes.decode("utf-8", errors="replace")
return {
"destination_meshtastic_id": "^all",
"payload": text,
"channel_index": channel_idx,
"companion_kind": "channel_msg",
"path_len": path_len,
"txt_type": txt_type,
"snr": snr,
"sender_timestamp": sender_ts,
"protocol": "companion_radio",
}
if code == 0: # RESP_CODE_OK
return {
"companion_kind": "ok",
"internal_only": True,
"protocol": "companion_radio",
}
if code == 1: # RESP_CODE_ERR
err_code = raw_data[1] if len(raw_data) > 1 else None
return {
"companion_kind": "err",
"error_code": err_code,
"internal_only": True,
"protocol": "companion_radio",
}
if code == 6: # RESP_CODE_SENT
if len(raw_data) < 1 + 1 + 4 + 4:
return None
send_type = raw_data[1]
ack_tag = raw_data[2:6]
timeout_ms = int.from_bytes(raw_data[6:10], "little", signed=False)
return {
"companion_kind": "sent",
"send_type": send_type,
"ack_tag": ack_tag.hex(),
"timeout_ms": timeout_ms,
"internal_only": True,
"protocol": "companion_radio",
}
if code == 0x82: # PUSH_CODE_SEND_CONFIRMED
if len(raw_data) < 1 + 4 + 4:
return None
ack_code = raw_data[1:5]
round_trip = int.from_bytes(raw_data[5:9], "little", signed=False)
return {
"companion_kind": "send_confirmed",
"ack_code": ack_code.hex(),
"round_trip_ms": round_trip,
"internal_only": True,
"protocol": "companion_radio",
}
if code == 0x80: # PUSH_CODE_ADVERT
if len(raw_data) < 1 + 32:
return None
pubkey = raw_data[1:33]
return {
"destination_meshtastic_id": "^all",
"payload": f"MC_ADVERT:{pubkey.hex()}",
"channel_index": 0,
"companion_kind": "advert",
"protocol": "companion_radio",
}
if code == 0x8A: # PUSH_CODE_NEW_ADVERT
if len(raw_data) < 1 + 32:
return None
pubkey = raw_data[1:33]
return {
"destination_meshtastic_id": "^all",
"payload": f"MC_NEW_ADVERT:{pubkey.hex()}",
"channel_index": 0,
"companion_kind": "new_advert",
"protocol": "companion_radio",
}
# Ignore non-message frames (device info, acks, errors, etc.)
self.logger.debug("Ignoring companion frame code: %s", code)
return None
_serial_protocol_handlers = {
"json_newline": JsonNewlineProtocol,