Enhance control data delivery and discovery handling

- Added a new method `deliver_control_data` in `RepeaterDaemon` to push CONTROL payloads to companion clients.
- Updated `PacketRouter` to invoke the new delivery method for control data packets.
- Introduced `push_control_data` in `CompanionFrameServer` to handle the sending of control data to clients.
- Enhanced `DiscoveryHelper` to support optional debug logging for control handling.

These changes improve the communication and control flow between the repeater and companion clients, facilitating better discovery response handling.
This commit is contained in:
agessaman
2026-02-14 20:24:00 -08:00
parent c0dec9e80a
commit 49f412acb0
4 changed files with 430 additions and 39 deletions

View File

@@ -19,6 +19,7 @@ from .constants import (
RESP_CODE_DEVICE_INFO,
CMD_ADD_UPDATE_CONTACT,
CMD_GET_CHANNEL,
CMD_GET_CONTACT_BY_KEY,
CMD_SET_CHANNEL,
CMD_SET_FLOOD_SCOPE,
CMD_APP_START,
@@ -31,9 +32,13 @@ from .constants import (
CMD_REMOVE_CONTACT,
CMD_RESET_PATH,
CMD_SEND_BINARY_REQ,
CMD_SEND_PATH_DISCOVERY_REQ,
CMD_SEND_CONTROL_DATA,
CMD_SEND_CHANNEL_TXT_MSG,
CMD_SEND_LOGIN,
CMD_SEND_SELF_ADVERT,
CMD_SEND_STATUS_REQ,
CMD_SEND_TELEMETRY_REQ,
CMD_SEND_TRACE_PATH,
CMD_SEND_TXT_MSG,
CMD_SET_ADVERT_LATLON,
@@ -42,6 +47,7 @@ from .constants import (
ERR_CODE_BAD_STATE,
ERR_CODE_ILLEGAL_ARG,
ERR_CODE_NOT_FOUND,
ERR_CODE_TABLE_FULL,
ERR_CODE_UNSUPPORTED_CMD,
FRAME_INBOUND_PREFIX,
FRAME_OUTBOUND_PREFIX,
@@ -50,12 +56,16 @@ from .constants import (
PUB_KEY_SIZE,
PUSH_CODE_ADVERT,
PUSH_CODE_BINARY_RESPONSE,
PUSH_CODE_NEW_ADVERT,
PUSH_CODE_LOGIN_FAIL,
PUSH_CODE_LOGIN_SUCCESS,
PUSH_CODE_LOG_RX_DATA,
PUSH_CODE_NEW_ADVERT,
PUSH_CODE_TRACE_DATA,
PUSH_CODE_MSG_WAITING,
PUSH_CODE_PATH_UPDATED,
PUSH_CODE_SEND_CONFIRMED,
PUSH_CODE_STATUS_RESPONSE,
PUSH_CODE_TELEMETRY_RESPONSE,
RESP_CODE_ADVERT_PATH,
RESP_CODE_BATT_AND_STORAGE,
RESP_CODE_CHANNEL_INFO,
@@ -75,6 +85,8 @@ from .constants import (
STATS_TYPE_CORE,
STATS_TYPE_PACKETS,
STATS_TYPE_RADIO,
PUSH_CODE_PATH_DISCOVERY_RESPONSE,
PUSH_CODE_CONTROL_DATA,
)
logger = logging.getLogger("CompanionFrameServer")
@@ -95,6 +107,7 @@ class CompanionFrameServer:
sqlite_handler=None,
local_hash: Optional[int] = None,
stats_getter=None,
control_handler=None,
):
self.bridge = bridge
self.companion_hash = companion_hash
@@ -103,6 +116,7 @@ class CompanionFrameServer:
self.sqlite_handler = sqlite_handler
self.local_hash = local_hash # Repeater's node hash; if path ends with this, we are final hop
self.stats_getter = stats_getter # Optional (stats_type: int) -> dict for companion stats
self._control_handler = control_handler # Optional; used to register/clear discovery callbacks so "No callback waiting" is not logged
self._server: Optional[asyncio.Server] = None
self._client_writer: Optional[asyncio.StreamWriter] = None
self._client_reader: Optional[asyncio.StreamReader] = None
@@ -134,6 +148,13 @@ class CompanionFrameServer:
self._server = None
logger.info(f"Companion frame server stopped (port={self.port})")
def _persist_companion_message(self, msg_dict: dict) -> None:
"""Persist a message to SQLite and remove it from the bridge queue so it is delivered once from SQLite."""
if not self.sqlite_handler:
return
self.sqlite_handler.companion_push_message(self.companion_hash, msg_dict)
self.bridge.message_queue.pop_last()
def _setup_push_callbacks(self) -> None:
"""Subscribe to bridge events and send PUSH frames to connected client."""
@@ -147,19 +168,16 @@ class CompanionFrameServer:
logger.debug(f"Push write error: {e}")
async def on_message_received(sender_key, text, timestamp, txt_type):
if self.sqlite_handler:
self.sqlite_handler.companion_push_message(
self.companion_hash,
{
"sender_key": sender_key,
"text": text,
"timestamp": timestamp,
"txt_type": txt_type,
"is_channel": False,
"channel_idx": 0,
"path_len": 0,
},
)
msg_dict = {
"sender_key": sender_key,
"text": text,
"timestamp": timestamp,
"txt_type": txt_type,
"is_channel": False,
"channel_idx": 0,
"path_len": 0,
}
self._persist_companion_message(msg_dict)
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
async def on_send_confirmed(crc):
@@ -213,9 +231,19 @@ class CompanionFrameServer:
if isinstance(pub_key, bytes) and len(pub_key) >= 32:
_write_push(bytes([PUSH_CODE_PATH_UPDATED]) + pub_key[:32])
async def on_channel_message_received(channel_name, sender_name, message_text, timestamp, path_len=0):
# Message is already in bridge.message_queue; do not push to sqlite or the same
# message would be delivered twice (once from queue, once from sqlite on next sync).
async def on_channel_message_received(
channel_name, sender_name, message_text, timestamp, path_len=0, channel_idx=0
):
msg_dict = {
"sender_key": b"",
"text": message_text,
"timestamp": timestamp,
"txt_type": 0,
"is_channel": True,
"channel_idx": channel_idx,
"path_len": path_len,
}
self._persist_companion_message(msg_dict)
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
async def on_binary_response(tag_bytes, response_data, parsed=None, request_type=None):
@@ -227,12 +255,28 @@ class CompanionFrameServer:
)
_write_push(frame)
async def on_path_discovery_response(tag_bytes, contact_pubkey, out_path, in_path):
# PUSH_CODE_PATH_DISCOVERY_RESPONSE 0x8D: reserved(1) + pub_key_prefix(6) + out_path_len(1) + out_path + in_path_len(1) + in_path
pub_key_prefix = (contact_pubkey if isinstance(contact_pubkey, bytes) else bytes.fromhex(contact_pubkey))[:6]
out_path = out_path if isinstance(out_path, bytes) else bytes(out_path)
in_path = in_path if isinstance(in_path, bytes) else bytes(in_path)
frame = (
bytes([PUSH_CODE_PATH_DISCOVERY_RESPONSE, 0])
+ pub_key_prefix
+ bytes([len(out_path)])
+ out_path
+ bytes([len(in_path)])
+ in_path
)
_write_push(frame)
self.bridge.on_message_received(on_message_received)
self.bridge.on_channel_message_received(on_channel_message_received)
self.bridge.on_send_confirmed(on_send_confirmed)
self.bridge.on_advert_received(on_advert_received)
self.bridge.on_contact_path_updated(on_contact_path_updated)
self.bridge.on_binary_response(on_binary_response)
self.bridge.on_path_discovery_response(on_path_discovery_response)
def push_trace_data(
self,
@@ -288,6 +332,55 @@ class CompanionFrameServer:
except Exception as e:
logger.debug("Push RX raw error: %s", e)
async def push_control_data(
self,
snr: float,
rssi: int,
path_len: int,
path_bytes: bytes,
payload: bytes,
) -> None:
"""Push CONTROL packet to client (PUSH_CODE_CONTROL_DATA 0x8E). Spec: code, SNR*4, RSSI (signed), path_len, payload (no path bytes). Frame layout matches meshcore_py reader (PacketType.CONTROL_DATA) and firmware MyMesh::onControlDataRecv. See docs/companion-discovery.md for discovery payload layout."""
if not self._client_writer or self._client_writer.is_closing():
logger.warning("Push control data skipped: no client connection")
return
# Discovery response (0x90): clear the no-op callback we registered for this tag
if self._control_handler and len(payload) >= 6 and (payload[0] & 0xF0) == 0x90:
tag = struct.unpack("<I", payload[2:6])[0]
self._control_handler.clear_response_callback(tag)
# Wire format: int8 SNR (×4), int8 RSSI (dBm); same two's-complement byte as meshcore_py/firmware
snr_val = snr if isinstance(snr, (int, float)) else 0.0
rssi_val = rssi if isinstance(rssi, (int, float)) else 0
snr_byte = max(-128, min(127, int(round(float(snr_val) * 4))))
rssi_byte = max(-128, min(127, int(rssi_val)))
if snr_byte < 0:
snr_byte += 256
if rssi_byte < 0:
rssi_byte += 256
# Match firmware MyMesh::onControlDataRecv: code, snr, rssi, path_len, payload (no path bytes)
path_len_byte = max(0, min(255, int(path_len) if path_len is not None else 0))
payload_max = MAX_FRAME_SIZE - 4
payload_slice = bytes(payload[:payload_max]) if payload else b""
data = (
bytes([PUSH_CODE_CONTROL_DATA, snr_byte & 0xFF, rssi_byte & 0xFF, path_len_byte])
+ payload_slice
)
try:
frame = bytes([FRAME_OUTBOUND_PREFIX]) + struct.pack("<H", len(data)) + data
self._client_writer.write(frame)
await self._drain_writer()
logger.debug(
"Pushed control data 0x8E to client: payload_len=%s, frame_len=%s",
len(payload_slice), len(frame),
)
# DEBUG: exact wire format for comparison with protocol/firmware (code, snr, rssi, path_len, payload...)
logger.debug(
"0x8E frame data: %s (snr_byte=%s rssi_byte=%s path_len=%s)",
data.hex(), snr_byte & 0xFF, rssi_byte & 0xFF, path_len_byte,
)
except Exception as e:
logger.warning("Push control data error: %s", e)
async def _drain_writer(self) -> None:
if self._client_writer:
try:
@@ -377,6 +470,8 @@ class CompanionFrameServer:
return
cmd = payload[0]
data = payload[1:]
# Log every command at INFO so discovery (52) and unsupported are visible in logs
logger.info("Companion cmd 0x%02x (%s) len=%s", cmd, cmd, len(payload))
if cmd in (CMD_GET_CHANNEL, CMD_SET_CHANNEL):
logger.debug(f"Companion cmd 0x{cmd:02x} ({'GET_CHANNEL' if cmd == CMD_GET_CHANNEL else 'SET_CHANNEL'}), payload_len={len(payload)}")
@@ -387,6 +482,8 @@ class CompanionFrameServer:
await self._cmd_device_query(data)
elif cmd == CMD_GET_CONTACTS:
await self._cmd_get_contacts(data)
elif cmd == CMD_GET_CONTACT_BY_KEY:
await self._cmd_get_contact_by_key(data)
elif cmd == CMD_SEND_TXT_MSG:
await self._cmd_send_txt_msg(data)
elif cmd == CMD_SEND_CHANNEL_TXT_MSG:
@@ -395,6 +492,10 @@ class CompanionFrameServer:
await self._cmd_sync_next_message(data)
elif cmd == CMD_SEND_LOGIN:
await self._cmd_send_login(data)
elif cmd == CMD_SEND_STATUS_REQ:
await self._cmd_send_status_req(data)
elif cmd == CMD_SEND_TELEMETRY_REQ:
await self._cmd_send_telemetry_req(data)
elif cmd == CMD_SEND_SELF_ADVERT:
await self._cmd_send_self_advert(data)
elif cmd == CMD_SET_ADVERT_NAME:
@@ -421,12 +522,20 @@ class CompanionFrameServer:
await self._cmd_set_channel(data)
elif cmd == CMD_SEND_BINARY_REQ:
await self._cmd_send_binary_req(data)
elif cmd == CMD_SEND_PATH_DISCOVERY_REQ:
await self._cmd_send_path_discovery_req(data)
elif cmd == CMD_SEND_CONTROL_DATA:
await self._cmd_send_control_data(data)
elif cmd == CMD_SEND_TRACE_PATH:
await self._cmd_send_trace_path(data)
elif cmd == CMD_SET_FLOOD_SCOPE:
# App sends this on connect; no-op for repeater companion (no radio scope)
self._write_ok()
else:
logger.warning(
"Companion unsupported cmd 0x%02x (%s) len=%s (expected 52 for path discovery)",
cmd, cmd, len(payload),
)
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
except Exception as e:
logger.error(f"Cmd 0x{cmd:02x} error: {e}", exc_info=True)
@@ -511,6 +620,35 @@ class CompanionFrameServer:
most_recent = max((c.lastmod for c in contacts), default=0)
self._write_frame(bytes([RESP_CODE_END_OF_CONTACTS]) + struct.pack("<I", most_recent))
async def _cmd_get_contact_by_key(self, data: bytes) -> None:
"""Handle CMD_GET_CONTACT_BY_KEY (0x1e): lookup by 32-byte pubkey, respond with RESP_CODE_CONTACT or ERR."""
if len(data) < PUB_KEY_SIZE:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[:PUB_KEY_SIZE]
contact = self.bridge.contacts.get_by_key(pubkey) if hasattr(self.bridge.contacts, "get_by_key") else None
if not contact:
self._write_err(ERR_CODE_NOT_FOUND)
return
c = contact
pubkey_b = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key)
name = (c.name.encode("utf-8")[:32] if isinstance(c.name, str) else c.name[:32]).ljust(32, b"\x00")
opl = c.out_path_len if hasattr(c, "out_path_len") else -1
opl_byte = 0xFF if opl < 0 else min(opl, 255)
frame = (
bytes([RESP_CODE_CONTACT])
+ pubkey_b
+ bytes([c.adv_type if hasattr(c, "adv_type") else 0, c.flags if hasattr(c, "flags") else 0])
+ bytes([opl_byte])
+ (c.out_path[:MAX_PATH_SIZE] if hasattr(c, "out_path") and c.out_path else b"").ljust(MAX_PATH_SIZE, b"\x00")
+ name
+ struct.pack("<I", c.last_advert_timestamp if hasattr(c, "last_advert_timestamp") else 0)
+ struct.pack("<i", int((c.gps_lat if hasattr(c, "gps_lat") else 0) * 1e6))
+ struct.pack("<i", int((c.gps_lon if hasattr(c, "gps_lon") else 0) * 1e6))
+ struct.pack("<I", c.lastmod if hasattr(c, "lastmod") else 0)
)
self._write_frame(frame)
async def _cmd_send_txt_msg(self, data: bytes) -> None:
if len(data) < 12:
self._write_err(ERR_CODE_ILLEGAL_ARG)
@@ -590,6 +728,59 @@ class CompanionFrameServer:
frame = bytes([RESP_CODE_SENT, 1 if result.is_flood else 0]) + struct.pack("<II", tag, timeout_ms)
self._write_frame(frame)
async def _cmd_send_control_data(self, data: bytes) -> None:
# CMD_SEND_CONTROL_DATA (55): first byte is flags/type (0x80 = DISCOVER_REQ). Firmware: (cmd_frame[1] & 0x80) != 0.
if len(data) < 2:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
if (data[0] & 0x80) == 0:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
# Discovery request: register a no-op response callback so ControlHandler won't log "No callback waiting"
if self._control_handler and len(data) >= 6 and (data[0] & 0xF0) == 0x80:
tag = struct.unpack("<I", data[2:6])[0]
self._control_handler.set_response_callback(tag, lambda _: None)
send_control = getattr(self.bridge, "send_control_data", None)
if not send_control:
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
return
try:
ok = await send_control(data)
except Exception as e:
logger.error("send_control_data error: %s", e, exc_info=True)
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
if ok:
self._write_ok()
await self._drain_writer()
else:
self._write_err(ERR_CODE_TABLE_FULL)
async def _cmd_send_path_discovery_req(self, data: bytes) -> None:
# CMD_SEND_PATH_DISCOVERY_REQ (52): reserved(1) + pub_key(32). Firmware: cmd_frame[1]==0, cmd_frame[2:34]=pub_key.
logger.info("Path discovery request received (cmd 52), data_len=%s", len(data))
if len(data) < 33:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pub_key = data[1:33]
send_req = getattr(self.bridge, "send_path_discovery_req", None)
if not send_req:
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
return
try:
result = await send_req(pub_key)
except Exception as e:
logger.error("send_path_discovery_req error: %s", e, exc_info=True)
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
if not result.success:
self._write_err(ERR_CODE_NOT_FOUND)
return
tag = result.expected_ack if result.expected_ack is not None else 0
timeout_ms = result.timeout_ms if result.timeout_ms is not None else 10000
frame = bytes([RESP_CODE_SENT, 1 if result.is_flood else 0]) + struct.pack("<II", tag, timeout_ms)
self._write_frame(frame)
async def _cmd_send_trace_path(self, data: bytes) -> None:
# CMD_SEND_TRACE_PATH: tag(4) + auth(4) + flags(1) + path_bytes (firmware MyMesh.cpp)
if len(data) < 10:
@@ -683,11 +874,11 @@ class CompanionFrameServer:
self._write_frame(frame)
async def _cmd_send_login(self, data: bytes) -> None:
if len(data) < 33:
if len(data) < 32:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[:32]
password = data[32:].decode("utf-8", errors="replace").rstrip("\x00")
password = data[32:].decode("utf-8", errors="replace").rstrip("\x00") if len(data) > 32 else ""
self._write_frame(bytes([RESP_CODE_SENT, 1]) + struct.pack("<II", 0, 10000))
result = await self.bridge.send_login(pubkey, password)
if result.get("success"):
@@ -700,6 +891,100 @@ class CompanionFrameServer:
else:
self._write_frame(bytes([PUSH_CODE_LOGIN_FAIL, 0]) + pubkey[:6])
async def _cmd_send_status_req(self, data: bytes) -> None:
# CMD_SEND_STATUS_REQ (27): pub_key(32).
# Firmware: cmd_frame[0]=CMD, pub_key = &cmd_frame[1]; len >= 1+PUB_KEY_SIZE
# data here is payload[1:] so data = pub_key(32). No reserved byte.
if len(data) < 32:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[0:32]
# Immediate RESP_CODE_SENT so client knows the request was dispatched
self._write_frame(bytes([RESP_CODE_SENT, 0]) + struct.pack("<II", 0, 15000))
await self._drain_writer()
result = await self.bridge.send_status_request(pubkey)
logger.debug(
f"STATUS_REQ result: success={result.get('success')}, "
f"stats_type={result.get('stats', {}).get('type', 'N/A')}, "
f"raw_bytes_len={len(result.get('stats', {}).get('raw_bytes', b''))}"
)
if not result.get("success"):
# No response or decryption failed — send empty status frame
self._write_frame(bytes([PUSH_CODE_STATUS_RESPONSE, 0]) + pubkey[:6])
return
# Push PUSH_CODE_STATUS_RESPONSE (0x87): reserved(1) + pub_key_prefix(6) + raw_stats_bytes
# The companion app expects the raw RepeaterStats struct (binary) after the prefix,
# exactly as the firmware forwards data[4:] from onContactResponse.
stats_data = result.get("stats", {})
raw_bytes = stats_data.get("raw_bytes", b"")
if not raw_bytes:
# No binary stats available — send empty status frame.
# NEVER send formatted text as raw bytes: the companion app
# interprets the frame as a binary RepeaterStats struct and
# would display garbled values.
logger.warning(
"STATUS_REQ: no raw_bytes in stats (type=%s), sending empty status frame",
stats_data.get("type", "unknown"),
)
self._write_frame(bytes([PUSH_CODE_STATUS_RESPONSE, 0]) + pubkey[:6])
return
logger.debug(
f"STATUS_REQ PUSH: raw_bytes={len(raw_bytes)}B, "
f"first 16: {raw_bytes[:16].hex() if len(raw_bytes) >= 16 else raw_bytes.hex()}"
)
self._write_frame(
bytes([PUSH_CODE_STATUS_RESPONSE, 0])
+ pubkey[:6]
+ raw_bytes
)
async def _cmd_send_telemetry_req(self, data: bytes) -> None:
# CMD_SEND_TELEMETRY_REQ (39): reserved(3) + pub_key(32) + optional flags(1).
# Firmware: cmd_frame[0]=CMD, reserved(3), pub_key = &cmd_frame[4]; len >= 4+PUB_KEY_SIZE
# data here is payload[1:] so data = reserved(3) + pub_key(32) + optional flags.
if len(data) < 35:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[3:35]
# The 3 reserved bytes (data[0..2]) are unused by firmware.
# Default to requesting all telemetry categories.
flags = 0x07 # request all: base + location + environment
want_base = bool(flags & 0x01)
want_location = bool(flags & 0x02)
want_environment = bool(flags & 0x04)
# Immediate RESP_CODE_SENT so client knows the request was dispatched
self._write_frame(bytes([RESP_CODE_SENT, 0]) + struct.pack("<II", 0, 15000))
await self._drain_writer()
result = await self.bridge.send_telemetry_request(
pubkey,
want_base=want_base,
want_location=want_location,
want_environment=want_environment,
)
if not result.get("success"):
self._write_frame(bytes([PUSH_CODE_TELEMETRY_RESPONSE, 0]) + pubkey[:6])
return
# Push PUSH_CODE_TELEMETRY_RESPONSE (0x8B): reserved(1) + pub_key_prefix(6) + raw_lpp_bytes
# Firmware forwards data[4:] (CayenneLPP bytes) from onContactResponse.
telem_data = result.get("telemetry_data", {})
raw_bytes = telem_data.get("raw_bytes", b"")
if not raw_bytes:
# No binary telemetry available — send empty telemetry frame.
# NEVER send formatted text as raw bytes: the companion app
# interprets the frame as CayenneLPP binary and would display
# garbled sensor values.
logger.warning(
"TELEMETRY_REQ: no raw_bytes in telemetry (type=%s), sending empty frame",
telem_data.get("type", "unknown"),
)
self._write_frame(bytes([PUSH_CODE_TELEMETRY_RESPONSE, 0]) + pubkey[:6])
return
self._write_frame(
bytes([PUSH_CODE_TELEMETRY_RESPONSE, 0])
+ pubkey[:6]
+ raw_bytes
)
async def _cmd_send_self_advert(self, data: bytes) -> None:
flood = len(data) >= 1 and data[0] == 1
ok = await self.bridge.advertise(flood=flood)
@@ -719,38 +1004,78 @@ class CompanionFrameServer:
self._write_ok()
async def _cmd_add_update_contact(self, data: bytes) -> None:
if len(data) < 73:
# Match meshcore minimum: 36 bytes (pubkey 32 + adv_type 1 + flags 1 + out_path_len 1).
if len(data) < 36:
self._write_err(ERR_CODE_ILLEGAL_ARG)
await self._drain_writer()
return
pubkey = data[0:32]
adv_type = data[32]
flags = data[33]
out_path_len = data[34]
out_path = data[35:35 + MAX_PATH_SIZE]
name_raw = data[35 + MAX_PATH_SIZE:35 + MAX_PATH_SIZE + 32]
out_path_len = struct.unpack_from("<b", data, 34)[0] # signed byte, -1 = unknown
# Safe parsing: only read fields that are present; use defaults to avoid reading past buffer.
out_path_end = 35 + MAX_PATH_SIZE
if len(data) >= out_path_end:
out_path = data[35:out_path_end].rstrip(b"\x00")
else:
out_path = data[35:len(data)].rstrip(b"\x00") if len(data) > 35 else b""
name_start = 35 + MAX_PATH_SIZE
name_end = name_start + 32
if len(data) >= name_end:
name_raw = data[name_start:name_end]
elif len(data) > name_start:
name_raw = data[name_start:len(data)].ljust(32, b"\x00")
else:
name_raw = b"\x00" * 32
name = name_raw.split(b"\x00")[0].decode("utf-8", errors="replace")
last_advert = struct.unpack_from("<I", data, 35 + MAX_PATH_SIZE + 32)[0]
last_advert = 0
if len(data) >= name_end + 4:
last_advert = struct.unpack_from("<I", data, name_end)[0]
gps_lat, gps_lon = 0.0, 0.0
if len(data) >= 35 + MAX_PATH_SIZE + 32 + 12:
gps_lat = struct.unpack_from("<i", data, 35 + MAX_PATH_SIZE + 32 + 4)[0] / 1e6
gps_lon = struct.unpack_from("<i", data, 35 + MAX_PATH_SIZE + 32 + 8)[0] / 1e6
if len(data) >= name_end + 4 + 8:
gps_lat = struct.unpack_from("<i", data, name_end + 4)[0] / 1e6
gps_lon = struct.unpack_from("<i", data, name_end + 8)[0] / 1e6
lastmod = int(time.time())
if len(data) >= name_end + 4 + 12:
lastmod = struct.unpack_from("<I", data, name_end + 12)[0]
contact = Contact(
public_key=pubkey,
name=name,
adv_type=adv_type,
flags=flags,
out_path_len=out_path_len,
out_path=out_path.rstrip(b"\x00"),
out_path=out_path,
last_advert_timestamp=last_advert,
lastmod=int(time.time()),
lastmod=lastmod,
gps_lat=gps_lat,
gps_lon=gps_lon,
)
ok = self.bridge.add_update_contact(contact)
if ok and self.sqlite_handler:
self._save_contacts()
# Send OK/ERR immediately so client gets confirmation (matches meshcore firmware).
self._write_ok() if ok else self._write_err(ERR_CODE_TABLE_FULL)
if ok:
# Echo RESP_CODE_CONTACT so client can add to list without CMD_GET_CONTACTS.
opl_byte = 0xFF if out_path_len < 0 or out_path_len > 255 else out_path_len
out_path_padded = (out_path[:MAX_PATH_SIZE] if out_path else b"").ljust(MAX_PATH_SIZE, b"\x00")
name_padded = (name.encode("utf-8")[:32] if isinstance(name, str) else name[:32]).ljust(32, b"\x00")
contact_frame = (
bytes([RESP_CODE_CONTACT])
+ pubkey
+ bytes([adv_type, flags, opl_byte])
+ out_path_padded
+ name_padded
+ struct.pack("<I", last_advert)
+ struct.pack("<i", int(gps_lat * 1e6))
+ struct.pack("<i", int(gps_lon * 1e6))
+ struct.pack("<I", lastmod)
)
self._write_frame(contact_frame)
await self._drain_writer()
if ok and self.sqlite_handler:
try:
self._save_contacts()
except Exception as e:
logger.warning("Save contacts after add/update failed: %s", e)
async def _cmd_remove_contact(self, data: bytes) -> None:
if len(data) < 32:

View File

@@ -21,6 +21,7 @@ class DiscoveryHelper:
packet_injector=None,
node_type: int = 2,
log_fn=None,
debug_log_fn=None,
):
"""
Initialize the discovery helper.
@@ -30,13 +31,18 @@ class DiscoveryHelper:
packet_injector: Callable to inject new packets into the router for sending
node_type: Node type identifier (2 = Repeater)
log_fn: Optional logging function for ControlHandler
debug_log_fn: Optional logging for verbose ControlHandler messages (e.g. callback
presence). Pass logger.debug to avoid INFO noise when forwarding to companions.
"""
self.local_identity = local_identity
self.packet_injector = packet_injector # Function to inject packets into router
self.node_type = node_type
# Create ControlHandler internally as a parsing utility
self.control_handler = ControlHandler(log_fn=log_fn or logger.info)
self.control_handler = ControlHandler(
log_fn=log_fn or logger.info,
debug_log_fn=debug_log_fn,
)
# Set up the request callback
self.control_handler.set_request_callback(self._on_discovery_request)

View File

@@ -164,6 +164,7 @@ class RepeaterDaemon:
packet_injector=self.router.inject_packet,
node_type=2,
log_fn=logger.info,
debug_log_fn=logger.debug,
)
logger.info("Discovery processing helper initialized")
else:
@@ -449,6 +450,7 @@ class RepeaterDaemon:
sqlite_handler=sqlite_handler,
local_hash=self.local_hash,
stats_getter=self._get_companion_stats,
control_handler=self.discovery_helper.control_handler if self.discovery_helper else None,
)
await frame_server.start()
self.companion_frame_servers.append(frame_server)
@@ -479,6 +481,33 @@ class RepeaterDaemon:
except Exception as e:
logger.debug("Push RX raw to companion: %s", e)
async def deliver_control_data(
self,
snr: float,
rssi: int,
path_len: int,
path_bytes: bytes,
payload_bytes: bytes,
) -> None:
"""Deliver CONTROL payload (e.g. discovery response) to companion clients (PUSH_CODE_CONTROL_DATA 0x8E)."""
# Only push discovery responses (0x90); client expects these, not the request (0x80)
if len(payload_bytes) < 6 or (payload_bytes[0] & 0xF0) != 0x90:
return
# Push every discovery response to the client, including our own (snr=0, rssi=0 = local node's response)
servers = getattr(self, "companion_frame_servers", [])
if not servers:
return
tag = int.from_bytes(payload_bytes[2:6], "little") if len(payload_bytes) >= 6 else 0
logger.debug(
"Delivering discovery response to %s companion(s): tag=0x%08X, len=%s",
len(servers), tag, len(payload_bytes),
)
for fs in servers:
try:
await fs.push_control_data(snr, rssi, path_len, path_bytes, payload_bytes)
except Exception as e:
logger.warning("Companion push_control_data error: %s", e)
async def _on_trace_complete_for_companions(self, packet, parsed_data) -> None:
"""Trace completed at this node: push PUSH_CODE_TRACE_DATA (0x89) to companion clients (firmware onTraceRecv)."""
path_len = len(parsed_data.get("trace_path", []))

View File

@@ -11,6 +11,7 @@ from pymc_core.node.handlers.path import PathHandler
from pymc_core.node.handlers.protocol_request import ProtocolRequestHandler
from pymc_core.node.handlers.group_text import GroupTextHandler
from pymc_core.node.handlers.protocol_response import ProtocolResponseHandler
from pymc_core.node.handlers.login_response import LoginResponseHandler
logger = logging.getLogger("PacketRouter")
class PacketRouter:
@@ -91,7 +92,16 @@ class PacketRouter:
if self.daemon.discovery_helper:
await self.daemon.discovery_helper.control_handler(packet)
packet.mark_do_not_retransmit()
# Deliver to companions via daemon (frame servers push PUSH_CODE_CONTROL_DATA 0x8E)
deliver = getattr(self.daemon, "deliver_control_data", None)
if deliver:
snr = getattr(packet, "_snr", None) or getattr(packet, "snr", 0.0)
rssi = getattr(packet, "_rssi", None) or getattr(packet, "rssi", 0)
path_len = getattr(packet, "path_len", 0) or 0
path_bytes = (bytes(getattr(packet, "path", [])) if getattr(packet, "path", None) is not None else b"")[:path_len]
payload_bytes = bytes(packet.payload) if packet.payload else b""
await deliver(snr, rssi, path_len, path_bytes, payload_bytes)
elif payload_type == AdvertHandler.payload_type():
# Process advertisement packet for neighbor tracking
if self.daemon.advert_helper:
@@ -106,7 +116,8 @@ class PacketRouter:
logger.debug(f"Companion bridge advert error: {e}")
elif payload_type == LoginServerHandler.payload_type():
# Route to companion if dest is a companion; else to login_helper
# Route to companion if dest is a companion; else to login_helper (for logging into this repeater).
# If dest is remote (no local handler), mark processed so we don't pass our own outbound login TX to the repeater as RX.
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
@@ -116,6 +127,9 @@ class PacketRouter:
handled = await self.daemon.login_helper.process_login_packet(packet)
if handled:
processed_by_injection = True
else:
# Login request for remote repeater (we already TXed it via inject); don't treat as RX.
processed_by_injection = True
elif payload_type == AckHandler.payload_type():
# ACK has no dest in payload (4-byte CRC only); deliver to all bridges so sender sees send_confirmed
@@ -147,11 +161,28 @@ class PacketRouter:
elif self.daemon.path_helper:
await self.daemon.path_helper.process_path_packet(packet)
elif payload_type == ProtocolResponseHandler.payload_type():
dest_hash = packet.payload[0] if packet.payload else None
elif payload_type == LoginResponseHandler.payload_type():
# PAYLOAD_TYPE_RESPONSE (0x01): login responses from remote repeaters.
# Deliver to all companion bridges so the bridge that initiated the login receives it.
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge LOGIN_RESPONSE error: {e}")
if companion_bridges:
processed_by_injection = True
elif payload_type == ProtocolResponseHandler.payload_type():
# PAYLOAD_TYPE_PATH (0x08): protocol responses (telemetry, binary, etc.).
# Deliver to all companion bridges (response dest_hash is the client, not the bridge).
companion_bridges = getattr(self.daemon, "companion_bridges", {})
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE error: {e}")
if companion_bridges:
processed_by_injection = True
elif payload_type == ProtocolRequestHandler.payload_type():