Implement companion client communication and statistics retrieval

- Added raw RX and trace completion handlers to push data to connected companion clients.
- Enhanced PacketRouter to deliver ACK packets to all companion bridges.
- Introduced methods for retrieving companion statistics based on different types.
- Updated constants for handling new commands and responses in the companion protocol.

This update improves the interaction between the repeater and companion clients, enabling better data flow and monitoring capabilities.
This commit is contained in:
agessaman
2026-02-13 22:26:05 -08:00
parent 15299bf374
commit c0dec9e80a
5 changed files with 397 additions and 65 deletions
+7 -4
View File
@@ -1,5 +1,7 @@
"""Companion frame protocol constants (MeshCore Companion Radio Protocol)."""
import base64
# Commands (app -> radio)
CMD_APP_START = 1
CMD_SEND_TXT_MSG = 2
@@ -127,9 +129,10 @@ ADV_TYPE_REPEATER = 2
ADV_TYPE_ROOM = 3
ADV_TYPE_SENSOR = 4
# Default Public channel PSK (from firmware)
# Default Public channel PSK (from firmware MeshCore/examples/companion_radio/MyMesh.cpp)
# Base64-encoded; decode to get the 16-byte secret used for MAC/AES
PUBLIC_GROUP_PSK = b"izOH6cXN6mrJ5e26oRXNcg=="
# Default public channel secret (hex) - used for channel 0 when no channels loaded
# Matches MeshCore firmware default for new radios
DEFAULT_PUBLIC_CHANNEL_SECRET = bytes.fromhex("8b3387e9c5cdea6ac9e5edbaa115cd72")
# Default public channel secret: base64-decode PUBLIC_GROUP_PSK so we match firmware
# (firmware uses decode_base64(psk) -> 16 bytes; HMAC key is that + 16 zero bytes)
DEFAULT_PUBLIC_CHANNEL_SECRET = base64.b64decode(PUBLIC_GROUP_PSK)
+268 -56
View File
@@ -23,15 +23,18 @@ from .constants import (
CMD_SET_FLOOD_SCOPE,
CMD_APP_START,
CMD_DEVICE_QUERY,
CMD_GET_ADVERT_PATH,
CMD_GET_BATT_AND_STORAGE,
CMD_GET_CONTACTS,
CMD_GET_STATS,
CMD_IMPORT_CONTACT,
CMD_REMOVE_CONTACT,
CMD_RESET_PATH,
CMD_SEND_BINARY_REQ,
CMD_SEND_CHANNEL_TXT_MSG,
CMD_SEND_LOGIN,
CMD_SEND_SELF_ADVERT,
CMD_SEND_TRACE_PATH,
CMD_SEND_TXT_MSG,
CMD_SET_ADVERT_LATLON,
CMD_SET_ADVERT_NAME,
@@ -46,9 +49,14 @@ from .constants import (
MAX_PATH_SIZE,
PUB_KEY_SIZE,
PUSH_CODE_ADVERT,
PUSH_CODE_BINARY_RESPONSE,
PUSH_CODE_NEW_ADVERT,
PUSH_CODE_LOG_RX_DATA,
PUSH_CODE_TRACE_DATA,
PUSH_CODE_MSG_WAITING,
PUSH_CODE_PATH_UPDATED,
PUSH_CODE_SEND_CONFIRMED,
RESP_CODE_ADVERT_PATH,
RESP_CODE_BATT_AND_STORAGE,
RESP_CODE_CHANNEL_INFO,
RESP_CODE_CHANNEL_MSG_RECV,
@@ -85,12 +93,16 @@ class CompanionFrameServer:
port: int = 5000,
bind_address: str = "0.0.0.0",
sqlite_handler=None,
local_hash: Optional[int] = None,
stats_getter=None,
):
self.bridge = bridge
self.companion_hash = companion_hash
self.port = port
self.bind_address = bind_address
self.sqlite_handler = sqlite_handler
self.local_hash = local_hash # Repeater's node hash; if path ends with this, we are final hop
self.stats_getter = stats_getter # Optional (stats_type: int) -> dict for companion stats
self._server: Optional[asyncio.Server] = None
self._client_writer: Optional[asyncio.StreamWriter] = None
self._client_reader: Optional[asyncio.StreamReader] = None
@@ -161,43 +173,120 @@ class CompanionFrameServer:
pubkey = bytes.fromhex(pubkey)
else:
pubkey = getattr(contact, "public_key", getattr(contact, "pub_key", b""))
if len(pubkey) >= 32:
_write_push(bytes([PUSH_CODE_ADVERT]) + pubkey[:32])
if isinstance(pubkey, str):
pubkey = bytes.fromhex(pubkey)
if len(pubkey) < 32:
return
_write_push(bytes([PUSH_CODE_ADVERT]) + pubkey[:32])
# Full contact push (PUSH_CODE_NEW_ADVERT) so app gets NEW_CONTACT and can add to list
if not isinstance(contact, dict) and hasattr(contact, "name") and contact.name:
pubkey_b = pubkey[:32] if isinstance(pubkey, bytes) else bytes.fromhex(str(pubkey))[:32]
name_b = (contact.name.encode("utf-8")[:32] if isinstance(contact.name, str) else contact.name[:32]).ljust(32, b"\x00")
opl = getattr(contact, "out_path_len", -1)
opl_byte = 0xFF if opl < 0 else min(opl, 255)
out_path = getattr(contact, "out_path", b"") or b""
if isinstance(out_path, str):
out_path = bytes.fromhex(out_path) if out_path else b""
elif isinstance(out_path, (list, bytearray)):
out_path = bytes(out_path)
out_path = out_path[:MAX_PATH_SIZE].ljust(MAX_PATH_SIZE, b"\x00")
adv_type = getattr(contact, "adv_type", 0)
flags = getattr(contact, "flags", 0)
last_advert = getattr(contact, "last_advert_timestamp", 0)
gps_lat = getattr(contact, "gps_lat", 0.0)
gps_lon = getattr(contact, "gps_lon", 0.0)
lastmod = getattr(contact, "lastmod", 0)
frame = (
bytes([PUSH_CODE_NEW_ADVERT])
+ pubkey_b
+ bytes([adv_type, flags, opl_byte])
+ out_path
+ name_b
+ struct.pack("<I", last_advert)
+ struct.pack("<i", int(gps_lat * 1e6))
+ struct.pack("<i", int(gps_lon * 1e6))
+ struct.pack("<I", lastmod)
)
_write_push(frame)
async def on_contact_path_updated(pub_key, path_len, path):
if isinstance(pub_key, bytes) and len(pub_key) >= 32:
_write_push(bytes([PUSH_CODE_PATH_UPDATED]) + pub_key[:32])
async def on_channel_message_received(channel_name, sender_name, message_text, timestamp, path_len=0):
channel_idx = 0
max_ch = getattr(
getattr(self.bridge, "channels", None), "max_channels", 40
)
for idx in range(max_ch):
ch = self.bridge.get_channel(idx)
if ch and ch.name == channel_name:
channel_idx = idx
break
if self.sqlite_handler:
self.sqlite_handler.companion_push_message(
self.companion_hash,
{
"sender_key": b"",
"text": message_text,
"timestamp": timestamp,
"txt_type": 0,
"is_channel": True,
"channel_idx": channel_idx,
"path_len": path_len,
},
)
# Message is already in bridge.message_queue; do not push to sqlite or the same
# message would be delivered twice (once from queue, once from sqlite on next sync).
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
async def on_binary_response(tag_bytes, response_data, parsed=None, request_type=None):
# PUSH_CODE_BINARY_RESPONSE: 0x8C + reserved(1) + tag(4) + response_payload
frame = (
bytes([PUSH_CODE_BINARY_RESPONSE, 0])
+ (tag_bytes if isinstance(tag_bytes, bytes) else struct.pack("<I", tag_bytes))
+ response_data
)
_write_push(frame)
self.bridge.on_message_received(on_message_received)
self.bridge.on_channel_message_received(on_channel_message_received)
self.bridge.on_send_confirmed(on_send_confirmed)
self.bridge.on_advert_received(on_advert_received)
self.bridge.on_contact_path_updated(on_contact_path_updated)
self.bridge.on_binary_response(on_binary_response)
def push_trace_data(
self,
path_len: int,
flags: int,
tag: int,
auth_code: int,
path_hashes: bytes,
path_snrs: bytes,
final_snr_byte: int,
) -> None:
"""Push PUSH_CODE_TRACE_DATA (0x89) to client. Matches firmware onTraceRecv() frame format."""
if not self._client_writer or self._client_writer.is_closing():
return
# Firmware: code(1) + reserved(1) + path_len(1) + flags(1) + tag(4) + auth(4) + path_hashes + path_snrs + final_snr(1)
path_sz = flags & 0x03
expected_snr_len = path_len >> path_sz
if len(path_snrs) != expected_snr_len:
logger.debug("push_trace_data: path_snrs len %s != expected %s", len(path_snrs), expected_snr_len)
return
data = (
bytes([PUSH_CODE_TRACE_DATA, 0, path_len, flags])
+ struct.pack("<II", tag & 0xFFFFFFFF, auth_code & 0xFFFFFFFF)
+ path_hashes
+ path_snrs
+ bytes([final_snr_byte & 0xFF])
)
try:
frame = bytes([FRAME_OUTBOUND_PREFIX]) + struct.pack("<H", len(data)) + data
self._client_writer.write(frame)
asyncio.create_task(self._drain_writer())
except Exception as e:
logger.debug("push_trace_data error: %s", e)
def push_rx_raw(self, snr: float, rssi: int, raw: bytes) -> None:
"""Push raw RX packet to client (PUSH_CODE_LOG_RX_DATA 0x88). Matches firmware logRxRaw() so client can track repeats by packet hash."""
if not self._client_writer or self._client_writer.is_closing():
logger.debug("push_rx_raw: no client connected (companion %s)", self.companion_hash)
return
# Firmware: code(1) + snr(1) + rssi(1) + raw; snr = (int8)(snr*4), rssi = (int8)rssi
snr_byte = max(-128, min(127, int(round(snr * 4))))
rssi_byte = max(-128, min(127, int(rssi)))
if snr_byte < 0:
snr_byte += 256
if rssi_byte < 0:
rssi_byte += 256
payload_len = min(len(raw), MAX_FRAME_SIZE - 3)
data = bytes([PUSH_CODE_LOG_RX_DATA, snr_byte & 0xFF, rssi_byte & 0xFF]) + raw[:payload_len]
try:
frame = bytes([FRAME_OUTBOUND_PREFIX]) + struct.pack("<H", len(data)) + data
self._client_writer.write(frame)
asyncio.create_task(self._drain_writer())
except Exception as e:
logger.debug("Push RX raw error: %s", e)
async def _drain_writer(self) -> None:
if self._client_writer:
@@ -322,12 +411,18 @@ class CompanionFrameServer:
await self._cmd_get_batt_and_storage(data)
elif cmd == CMD_GET_STATS:
await self._cmd_get_stats(data)
elif cmd == CMD_GET_ADVERT_PATH:
await self._cmd_get_advert_path(data)
elif cmd == CMD_IMPORT_CONTACT:
await self._cmd_import_contact(data)
elif cmd == CMD_GET_CHANNEL:
await self._cmd_get_channel(data)
elif cmd == CMD_SET_CHANNEL:
await self._cmd_set_channel(data)
elif cmd == CMD_SEND_BINARY_REQ:
await self._cmd_send_binary_req(data)
elif cmd == CMD_SEND_TRACE_PATH:
await self._cmd_send_trace_path(data)
elif cmd == CMD_SET_FLOOD_SCOPE:
# App sends this on connect; no-op for repeater companion (no radio scope)
self._write_ok()
@@ -449,12 +544,93 @@ class CompanionFrameServer:
if len(data) < 6:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
# Protocol: txt_type(1) + channel_idx(1) + sender_timestamp(4) + text
# Protocol: txt_type(1) + channel_idx(1) + sender_timestamp(4) + text (matches firmware/meshcore_py)
txt_type = data[0]
channel_idx = data[1]
sender_ts = struct.unpack_from("<I", data, 2)[0]
text = data[6:].decode("utf-8", errors="replace").rstrip("\x00")
if txt_type != 0: # TXT_TYPE_PLAIN
logger.debug("CMD_SEND_CHANNEL_TXT_MSG: unsupported txt_type=%s", txt_type)
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
return
if self.bridge.get_channel(channel_idx) is None:
logger.info("CMD_SEND_CHANNEL_TXT_MSG: channel idx %s not found", channel_idx)
self._write_err(ERR_CODE_NOT_FOUND)
return
ok = await self.bridge.send_channel_message(channel_idx, text)
self._write_ok() if ok else self._write_err(ERR_CODE_BAD_STATE)
if ok:
self._write_ok()
else:
logger.warning("CMD_SEND_CHANNEL_TXT_MSG: send failed for channel %s", channel_idx)
self._write_err(ERR_CODE_BAD_STATE)
async def _cmd_send_binary_req(self, data: bytes) -> None:
# CMD_SEND_BINARY_REQ: pubkey(32) + req_data (request_type(1) + optional payload)
if len(data) < 33:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[:32]
req_data = data[32:]
send_binary_req = getattr(self.bridge, "send_binary_req", None)
if not send_binary_req:
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
return
try:
result = await send_binary_req(pubkey, req_data)
except Exception as e:
logger.error(f"send_binary_req error: {e}", exc_info=True)
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
if not result.success:
self._write_err(ERR_CODE_NOT_FOUND)
return
# RESP_CODE_SENT: 0x06 + flood(1) + tag(4 LE) + timeout(4 LE)
tag = result.expected_ack if result.expected_ack is not None else 0
timeout_ms = result.timeout_ms if result.timeout_ms is not None else 10000
frame = bytes([RESP_CODE_SENT, 1 if result.is_flood else 0]) + struct.pack("<II", tag, timeout_ms)
self._write_frame(frame)
async def _cmd_send_trace_path(self, data: bytes) -> None:
# CMD_SEND_TRACE_PATH: tag(4) + auth(4) + flags(1) + path_bytes (firmware MyMesh.cpp)
if len(data) < 10:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
tag = struct.unpack_from("<I", data, 0)[0]
auth_code = struct.unpack_from("<I", data, 4)[0]
flags = data[8]
path_bytes = data[9:]
path_len = len(path_bytes)
path_sz = flags & 0x03
# Firmware: (path_len >> path_sz) <= MAX_PATH_SIZE and path_len % (1 << path_sz) == 0
if (path_len >> path_sz) > MAX_PATH_SIZE or (path_len % (1 << path_sz)) != 0:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
send_raw = getattr(self.bridge, "send_trace_path_raw", None)
if not send_raw:
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
return
try:
ok = await send_raw(tag, auth_code, flags, path_bytes)
except Exception as e:
logger.error(f"send_trace_path error: {e}", exc_info=True)
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
if not ok:
self._write_err(ERR_CODE_TABLE_FULL)
return
# RESP_CODE_SENT + 0 (not flood) + tag(4) + est_timeout(4) = 10 bytes (firmware)
est_timeout_ms = 5000 + (path_len * 200)
frame = bytes([RESP_CODE_SENT, 0]) + struct.pack("<II", tag, est_timeout_ms)
self._write_frame(frame)
# If we are the final hop (path ends with our node), we never receive our own TX; push 0x89 now.
if path_bytes and self.local_hash is not None and path_bytes[-1] == self.local_hash:
path_sz = flags & 0x03
snr_len = path_len >> path_sz
path_snrs = bytes(snr_len) # no RX SNR when we're the sender
final_snr_byte = 0
self.push_trace_data(
path_len, flags, tag, auth_code, path_bytes, path_snrs, final_snr_byte
)
async def _cmd_sync_next_message(self, data: bytes) -> None:
msg = self.bridge.sync_next_message()
@@ -474,20 +650,22 @@ class CompanionFrameServer:
self._write_frame(bytes([RESP_CODE_NO_MORE_MESSAGES]))
return
if msg.is_channel:
# Layout must match meshcore_py reader.py (PacketType.CHANNEL_MSG_RECV and type 17)
# so client can group repeats by (channel_idx, sender_timestamp, text); path_len differs per repeat.
path_len_byte = msg.path_len if msg.path_len < 256 else 0xFF
txt_type = 0 # TXT_TYPE_PLAIN
text_bytes = msg.text.encode("utf-8", errors="replace")
text_bytes = (msg.text or "").rstrip("\x00").encode("utf-8", errors="replace")
if self._app_target_ver >= 3:
# RESP_CODE_CHANNEL_MSG_RECV_V3: code + snr + reserved(2) + channel_idx + path_len + txt_type + timestamp + text
# V3: code(1) + snr(1) + reserved(2) + channel_idx(1) + path_len(1) + txt_type(1) + timestamp(4) + text
frame = bytes([
RESP_CODE_CHANNEL_MSG_RECV_V3,
0, # snr (we don't have it when popping from queue)
0, 0, # reserved
0, 0, 0, # snr + reserved
msg.channel_idx,
path_len_byte,
txt_type,
]) + struct.pack("<I", msg.timestamp) + text_bytes
else:
# Type 8: code(1) + channel_idx(1) + path_len(1) + txt_type(1) + timestamp(4) + text
frame = bytes([RESP_CODE_CHANNEL_MSG_RECV, msg.channel_idx, path_len_byte, txt_type])
frame += struct.pack("<I", msg.timestamp) + text_bytes
else:
@@ -572,16 +750,19 @@ class CompanionFrameServer:
if ok and self.sqlite_handler:
self._save_contacts()
self._write_ok() if ok else self._write_err(ERR_CODE_TABLE_FULL)
await self._drain_writer()
async def _cmd_remove_contact(self, data: bytes) -> None:
if len(data) < 32:
self._write_err(ERR_CODE_ILLEGAL_ARG)
await self._drain_writer()
return
pubkey = data[:32]
ok = self.bridge.remove_contact(pubkey)
if ok and self.sqlite_handler:
self._save_contacts()
self._write_ok() if ok else self._write_err(ERR_CODE_NOT_FOUND)
await self._drain_writer()
async def _cmd_reset_path(self, data: bytes) -> None:
if len(data) < 32:
@@ -599,22 +780,58 @@ class CompanionFrameServer:
self._write_frame(frame)
async def _cmd_get_stats(self, data: bytes) -> None:
# CMD_GET_STATS (56): data[0] = stats_type (0=core, 1=radio, 2=packets). Firmware MyMesh.cpp + meshcore_py reader.
stats_type = data[0] if len(data) >= 1 else STATS_TYPE_PACKETS
stats = self.bridge.get_stats(stats_type)
if stats_type not in (STATS_TYPE_CORE, STATS_TYPE_RADIO, STATS_TYPE_PACKETS):
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
stats = (self.stats_getter(stats_type) if self.stats_getter else None) or self.bridge.get_stats(stats_type)
frame = bytes([RESP_CODE_STATS, stats_type])
if stats_type == STATS_TYPE_CORE:
frame += struct.pack("<h", 0)
frame += struct.pack("<I", int(stats.get("uptime_secs", 0)))
frame += struct.pack("<H", 0)
frame += bytes([stats.get("queue_len", 0)])
# Format: battery_mv(H) + uptime_secs(I) + errors(H) + queue_len(B) = 9 bytes (meshcore_py <H I H B>)
battery_mv = int(stats.get("battery_mv", 0))
uptime_secs = int(stats.get("uptime_secs", 0))
errors = int(stats.get("errors", 0))
queue_len = min(255, max(0, int(stats.get("queue_len", 0))))
frame += struct.pack("<H I H B", battery_mv, uptime_secs, errors, queue_len)
elif stats_type == STATS_TYPE_RADIO:
frame += struct.pack("<h", stats.get("last_rssi", 0) or 0)
frame += bytes([stats.get("last_rssi", 0) or 0, int((stats.get("last_snr", 0) or 0) * 4)])
frame += struct.pack("<II", 0, 0)
# Format: noise_floor(h) + last_rssi(b) + last_snr(b, SNR*4) + tx_air_secs(I) + rx_air_secs(I) = 12 bytes
noise_floor = int(stats.get("noise_floor", 0))
last_rssi = max(-128, min(127, int(stats.get("last_rssi", 0))))
last_snr_scaled = max(-128, min(127, int(round((stats.get("last_snr") or 0) * 4))))
tx_air_secs = int(stats.get("tx_air_secs", 0))
rx_air_secs = int(stats.get("rx_air_secs", 0))
frame += struct.pack("<h b b I I", noise_floor, last_rssi, last_snr_scaled, tx_air_secs, rx_air_secs)
else:
frame += struct.pack("<II", stats.get("num_recv_packets", 0), stats.get("num_sent_packets", 0))
frame += struct.pack("<II", stats.get("num_sent_flood", 0), stats.get("num_sent_direct", 0))
frame += struct.pack("<II", stats.get("num_recv_flood", 0), stats.get("num_recv_direct", 0))
# STATS_TYPE_PACKETS: recv(I)+sent(I)+flood_tx(I)+direct_tx(I)+flood_rx(I)+direct_rx(I)+recv_errors(I) = 28 bytes
recv = int(stats.get("recv", 0))
sent = int(stats.get("sent", 0))
flood_tx = int(stats.get("flood_tx", 0))
direct_tx = int(stats.get("direct_tx", 0))
flood_rx = int(stats.get("flood_rx", 0))
direct_rx = int(stats.get("direct_rx", 0))
recv_errors = int(stats.get("recv_errors", 0))
frame += struct.pack("<I I I I I I I", recv, sent, flood_tx, direct_tx, flood_rx, direct_rx, recv_errors)
self._write_frame(frame)
async def _cmd_get_advert_path(self, data: bytes) -> None:
# CMD_GET_ADVERT_PATH (42): reserved(1) + pub_key(32). Return inbound path from advert_paths (path_cache).
# Firmware: RESP_CODE_ADVERT_PATH(1) + recv_timestamp(4 LE) + path_len(1) + path(path_len)
if len(data) < 1 + PUB_KEY_SIZE:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pub_key = data[1 : 1 + PUB_KEY_SIZE]
prefix = pub_key[:7]
found = self.bridge.get_advert_path(prefix) if getattr(self.bridge, "get_advert_path", None) else None
if not found:
self._write_err(ERR_CODE_NOT_FOUND)
return
path_bytes = getattr(found, "path", None) or b""
if not isinstance(path_bytes, bytes):
path_bytes = bytes(path_bytes)
path_len = min(len(path_bytes), MAX_PATH_SIZE)
recv_ts = getattr(found, "recv_timestamp", 0)
frame = bytes([RESP_CODE_ADVERT_PATH]) + struct.pack("<I", recv_ts) + bytes([path_len]) + path_bytes[:path_len]
self._write_frame(frame)
async def _cmd_import_contact(self, data: bytes) -> None:
@@ -633,26 +850,23 @@ class CompanionFrameServer:
f"CMD_GET_CHANNEL: idx={channel_idx}, data_len={len(data)}, get_full_list={get_full_list}"
)
def _channel_info_frame(idx: int, ch, include_idx: bool = True) -> bytes:
# Frame format per firmware & meshcore_py: code(1) + channel_idx(1) + name(32) + secret(16)
def _channel_info_frame(idx: int, ch) -> bytes:
if ch is None:
name = b"\x00" * 32
secret = b"\x00" * 32
secret = b"\x00" * 16
else:
name = ch.name.encode("utf-8", errors="replace")[:32].ljust(
32, b"\x00"
)
secret = (
ch.secret[:32].ljust(32, b"\x00") if ch.secret else b"\x00" * 32
)
if include_idx:
return bytes([RESP_CODE_CHANNEL_INFO, idx]) + name + secret
# Some apps expect code + name(32) + secret(32) only (no index byte)
return bytes([RESP_CODE_CHANNEL_INFO]) + name + secret
# Firmware and meshcore_py use 16-byte (128-bit) secret in the frame
secret = (ch.secret[:16] if ch.secret else b"\x00" * 16).ljust(16, b"\x00")
return bytes([RESP_CODE_CHANNEL_INFO, idx]) + name + secret
if get_full_list:
for idx in range(max_channels_val):
ch = self.bridge.get_channel(idx)
frame = _channel_info_frame(idx, ch, include_idx=True)
frame = _channel_info_frame(idx, ch)
self._write_frame(frame)
logger.debug(f"CMD_GET_CHANNEL: sent full list ({max_channels_val} slots)")
return
@@ -665,10 +879,8 @@ class CompanionFrameServer:
if ch is None:
logger.debug(f"CMD_GET_CHANNEL: returning empty slot {channel_idx}")
else:
logger.debug(f"CMD_GET_CHANNEL: returning {ch.name!r}, secret_len=32")
# Send code + name(32) + secret(32) without channel_idx so the app sees
# name at offset 1 (avoids app treating channel_idx as first byte of name).
frame = _channel_info_frame(channel_idx, ch, include_idx=False)
logger.debug(f"CMD_GET_CHANNEL: returning {ch.name!r}, secret_len=16")
frame = _channel_info_frame(channel_idx, ch)
self._write_frame(frame)
async def _cmd_set_channel(self, data: bytes) -> None:
@@ -708,7 +920,7 @@ class CompanionFrameServer:
self._save_channels()
logger.debug(f"CMD_SET_CHANNEL: set_channel ok={ok}")
self._write_ok() if ok else self._write_err(ERR_CODE_TABLE_FULL)
self._write_ok() if ok else self._write_err(ERR_CODE_NOT_FOUND)
def _save_channels(self) -> None:
"""Persist channels to SQLite."""
+9
View File
@@ -38,6 +38,9 @@ class TraceHelper:
# Ping callback system - track pending ping requests by tag
self.pending_pings = {} # {tag: {'event': asyncio.Event(), 'result': dict, 'target': int, 'sent_at': float}}
# Optional: when trace reaches final node, call this (packet, parsed_data) to push 0x89 to companions
self.on_trace_complete = None # async (packet, parsed_data) -> None
# Create TraceHandler internally as a parsing utility
self.trace_handler = TraceHandler(log_fn=log_fn or logger.info)
@@ -107,6 +110,12 @@ class TraceHelper:
else:
# This is the final destination or can't forward - just log and record
self._log_no_forward_reason(packet, trace_path, trace_path_len)
# When trace completed (reached end of path), push PUSH_CODE_TRACE_DATA (0x89) to companions (firmware onTraceRecv)
if packet.path_len >= trace_path_len and self.on_trace_complete:
try:
await self.on_trace_complete(packet, parsed_data)
except Exception as e:
logger.debug("on_trace_complete error: %s", e)
except Exception as e:
logger.error(f"Error processing trace packet: {e}")
+94
View File
@@ -2,6 +2,7 @@ import asyncio
import logging
import os
import sys
import time
from repeater.config import get_radio_for_board, load_config
from repeater.config_manager import ConfigManager
@@ -261,6 +262,17 @@ class RepeaterDaemon:
# Load companion identities (CompanionBridge + frame server per companion)
await self._load_companion_identities()
# Subscribe to raw RX in pyMC_core so we can push PUSH_CODE_LOG_RX_DATA to companion clients
self.dispatcher.add_raw_rx_subscriber(self._on_raw_rx_for_companions)
n = len(getattr(self, "companion_frame_servers", []))
logger.info(
"Raw RX subscriber registered (%s companion frame server(s)). Connect a client to see rx_log (0x88).",
n,
)
# When trace reaches final node, push PUSH_CODE_TRACE_DATA (0x89) to companion clients (firmware onTraceRecv)
self.trace_helper.on_trace_complete = self._on_trace_complete_for_companions
except Exception as e:
logger.error(f"Failed to initialize dispatcher: {e}")
raise
@@ -435,6 +447,8 @@ class RepeaterDaemon:
port=tcp_port,
bind_address=bind_address,
sqlite_handler=sqlite_handler,
local_hash=self.local_hash,
stats_getter=self._get_companion_stats,
)
await frame_server.start()
self.companion_frame_servers.append(frame_server)
@@ -454,6 +468,38 @@ class RepeaterDaemon:
except Exception as e:
logger.error(f"Failed to load companion '{name}': {e}", exc_info=True)
async def _on_raw_rx_for_companions(self, data: bytes, rssi: int, snr: float) -> None:
"""Raw RX subscriber: push PUSH_CODE_LOG_RX_DATA (0x88) to connected companion clients."""
servers = getattr(self, "companion_frame_servers", [])
if not servers:
return
for fs in servers:
try:
fs.push_rx_raw(snr, rssi, data)
except Exception as e:
logger.debug("Push RX raw to companion: %s", e)
async def _on_trace_complete_for_companions(self, packet, parsed_data) -> None:
"""Trace completed at this node: push PUSH_CODE_TRACE_DATA (0x89) to companion clients (firmware onTraceRecv)."""
path_len = len(parsed_data.get("trace_path", []))
if path_len == 0:
return
path_hashes = bytes(parsed_data["trace_path"])
flags = parsed_data.get("flags", 0)
tag = parsed_data.get("tag", 0)
auth_code = parsed_data.get("auth_code", 0)
# path_snrs: exactly path_len bytes = (path_len-1) from forwarding hops + 1 (our receive SNR)
snr_scaled = max(-128, min(127, int(round(packet.get_snr() * 4))))
snr_byte = snr_scaled if snr_scaled >= 0 else (256 + snr_scaled)
path_snrs = bytes(packet.path)[: path_len - 1] + bytes([snr_byte])
for fs in getattr(self, "companion_frame_servers", []):
try:
fs.push_trace_data(
path_len, flags, tag, auth_code, path_hashes, path_snrs, snr_byte
)
except Exception as e:
logger.debug("Push trace data to companion: %s", e)
def _register_identity_everywhere(
self,
name: str,
@@ -553,6 +599,54 @@ class RepeaterDaemon:
return stats
def _get_companion_stats(self, stats_type: int) -> dict:
"""Return stats dict for companion CMD_GET_STATS (format expected by frame_server + meshcore_py)."""
from repeater.companion.constants import STATS_TYPE_CORE, STATS_TYPE_RADIO, STATS_TYPE_PACKETS
if not self.repeater_handler:
return {}
engine = self.repeater_handler
airtime = engine.airtime_mgr.get_stats()
uptime_secs = int(time.time() - engine.start_time)
queue_len = 0
for bridge in getattr(self, "companion_bridges", {}).values():
queue_len += getattr(getattr(bridge, "message_queue", None), "count", 0) or 0
if stats_type == STATS_TYPE_CORE:
return {
"battery_mv": 0,
"uptime_secs": uptime_secs,
"errors": 0,
"queue_len": min(255, queue_len),
}
if stats_type == STATS_TYPE_RADIO:
noise_floor = int(engine.get_noise_floor() or 0)
radio = getattr(self, "dispatcher", None) and getattr(self.dispatcher, "radio", None)
if radio:
_r = getattr(radio, "get_last_rssi", lambda: 0)
_s = getattr(radio, "get_last_snr", lambda: 0.0)
last_rssi = _r() if callable(_r) else _r
last_snr = _s() if callable(_s) else _s
else:
last_rssi, last_snr = 0, 0.0
tx_air_secs = int(airtime.get("total_airtime_ms", 0) / 1000)
return {
"noise_floor": noise_floor,
"last_rssi": int(last_rssi) if last_rssi is not None else 0,
"last_snr": float(last_snr) if last_snr is not None else 0.0,
"tx_air_secs": tx_air_secs,
"rx_air_secs": 0,
}
if stats_type == STATS_TYPE_PACKETS:
return {
"recv": getattr(engine, "rx_count", 0),
"sent": getattr(engine, "forwarded_count", 0),
"flood_tx": getattr(engine, "forwarded_count", 0),
"direct_tx": 0,
"flood_rx": getattr(engine, "rx_count", 0),
"direct_rx": 0,
"recv_errors": getattr(engine, "dropped_count", 0),
}
return {}
async def send_advert(self) -> bool:
if not self.dispatcher or not self.local_identity:
+19 -5
View File
@@ -4,6 +4,7 @@ import logging
from pymc_core.node.handlers.trace import TraceHandler
from pymc_core.node.handlers.control import ControlHandler
from pymc_core.node.handlers.advert import AdvertHandler
from pymc_core.node.handlers.ack import AckHandler
from pymc_core.node.handlers.login_server import LoginServerHandler
from pymc_core.node.handlers.text import TextMessageHandler
from pymc_core.node.handlers.path import PathHandler
@@ -43,17 +44,20 @@ class PacketRouter:
try:
metadata = {
"rssi": getattr(packet, "rssi", 0),
"snr": getattr(packet, "snr", 0.0),
"snr": getattr(packet, "snr", 0.0),
"timestamp": getattr(packet, "timestamp", 0),
}
# Use local_transmission=True to bypass forwarding logic
await self.daemon.repeater_handler(packet, metadata, local_transmission=True)
# Enqueue so router can deliver to companion(s): TXT_MSG -> dest bridge, ACK -> all bridges (sender sees ACK)
await self.enqueue(packet)
packet_len = len(packet.payload) if packet.payload else 0
logger.debug(f"Injected packet processed by engine as local transmission ({packet_len} bytes)")
return True
except Exception as e:
logger.error(f"Error injecting packet through engine: {e}")
return False
@@ -73,7 +77,7 @@ class PacketRouter:
payload_type = packet.get_payload_type()
processed_by_injection = False
# Route to specific handlers for parsing only
if payload_type == TraceHandler.payload_type():
# Process trace packet
@@ -113,6 +117,16 @@ class PacketRouter:
if handled:
processed_by_injection = True
elif payload_type == AckHandler.payload_type():
# ACK has no dest in payload (4-byte CRC only); deliver to all bridges so sender sees send_confirmed
companion_bridges = getattr(self.daemon, "companion_bridges", {})
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge ACK error: {e}")
processed_by_injection = True
elif payload_type == TextMessageHandler.payload_type():
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})