Add companion module and API integration

- Add repeater/companion with frame server and constants
- Extend config, sqlite_handler, mesh_cli, packet_router for companion
- Update api_endpoints and auth_endpoints; adjust main entry

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
agessaman
2026-02-13 16:07:43 -08:00
parent 9eb2b8a50f
commit 15299bf374
11 changed files with 1408 additions and 74 deletions
+17 -1
View File
@@ -100,7 +100,23 @@ identities:
# longitude: 0.0
# admin_password: "social_admin_123"
# guest_password: "social_guest_123"
# Companion Identities
# Each companion exposes the MeshCore frame protocol over TCP for standard clients.
# One TCP client per companion at a time. Clients connect to repeater-ip:tcp_port.
companions:
# - name: "RepeaterCompanion"
# identity_key: "your_companion_identity_key_hex_here"
# settings:
# node_name: "RepeaterCompanion"
# tcp_port: 5000
# bind_address: "0.0.0.0"
# - name: "BotCompanion"
# identity_key: "another_companion_identity_key_hex"
# settings:
# node_name: "meshcore-bot"
# tcp_port: 5001
radio:
# Frequency in Hz (869.618 MHz for EU)
frequency: 869618000
+28
View File
@@ -0,0 +1,28 @@
"""Companion identity support for pyMC Repeater.
Exposes the MeshCore companion frame protocol over TCP for standard clients.
"""
from .frame_server import CompanionFrameServer
from .constants import (
CMD_APP_START,
CMD_GET_CONTACTS,
CMD_SEND_TXT_MSG,
CMD_SYNC_NEXT_MESSAGE,
CMD_SEND_LOGIN,
RESP_CODE_OK,
RESP_CODE_ERR,
PUSH_CODE_MSG_WAITING,
)
__all__ = [
"CompanionFrameServer",
"CMD_APP_START",
"CMD_GET_CONTACTS",
"CMD_SEND_TXT_MSG",
"CMD_SYNC_NEXT_MESSAGE",
"CMD_SEND_LOGIN",
"RESP_CODE_OK",
"RESP_CODE_ERR",
"PUSH_CODE_MSG_WAITING",
]
+135
View File
@@ -0,0 +1,135 @@
"""Companion frame protocol constants (MeshCore Companion Radio Protocol)."""
# Commands (app -> radio)
CMD_APP_START = 1
CMD_SEND_TXT_MSG = 2
CMD_SEND_CHANNEL_TXT_MSG = 3
CMD_GET_CONTACTS = 4
CMD_GET_DEVICE_TIME = 5
CMD_SET_DEVICE_TIME = 6
CMD_SEND_SELF_ADVERT = 7
CMD_SET_ADVERT_NAME = 8
CMD_ADD_UPDATE_CONTACT = 9
CMD_SYNC_NEXT_MESSAGE = 10
CMD_SET_RADIO_PARAMS = 11
CMD_SET_RADIO_TX_POWER = 12
CMD_RESET_PATH = 13
CMD_SET_ADVERT_LATLON = 14
CMD_REMOVE_CONTACT = 15
CMD_SHARE_CONTACT = 16
CMD_EXPORT_CONTACT = 17
CMD_IMPORT_CONTACT = 18
CMD_REBOOT = 19
CMD_GET_BATT_AND_STORAGE = 20
CMD_SET_TUNING_PARAMS = 21
CMD_DEVICE_QUERY = 22
CMD_EXPORT_PRIVATE_KEY = 23
CMD_IMPORT_PRIVATE_KEY = 24
CMD_SEND_RAW_DATA = 25
CMD_SEND_LOGIN = 26
CMD_SEND_STATUS_REQ = 27
CMD_HAS_CONNECTION = 28
CMD_LOGOUT = 29
CMD_GET_CONTACT_BY_KEY = 30
CMD_GET_CHANNEL = 31
CMD_SET_CHANNEL = 32
CMD_SIGN_START = 33
CMD_SIGN_DATA = 34
CMD_SIGN_FINISH = 35
CMD_SEND_TRACE_PATH = 36
CMD_SET_DEVICE_PIN = 37
CMD_SET_OTHER_PARAMS = 38
CMD_SEND_TELEMETRY_REQ = 39
CMD_GET_CUSTOM_VARS = 40
CMD_SET_CUSTOM_VAR = 41
CMD_GET_ADVERT_PATH = 42
CMD_GET_TUNING_PARAMS = 43
CMD_SEND_BINARY_REQ = 50
CMD_FACTORY_RESET = 51
CMD_SEND_PATH_DISCOVERY_REQ = 52
CMD_SET_FLOOD_SCOPE = 54
CMD_SEND_CONTROL_DATA = 55
CMD_GET_STATS = 56
CMD_SEND_ANON_REQ = 57
CMD_SET_AUTOADD_CONFIG = 58
CMD_GET_AUTOADD_CONFIG = 59
# Response codes (radio -> app)
RESP_CODE_OK = 0
RESP_CODE_ERR = 1
RESP_CODE_CONTACTS_START = 2
RESP_CODE_CONTACT = 3
RESP_CODE_END_OF_CONTACTS = 4
RESP_CODE_SELF_INFO = 5
RESP_CODE_SENT = 6
RESP_CODE_CONTACT_MSG_RECV = 7
RESP_CODE_CHANNEL_MSG_RECV = 8
RESP_CODE_CURR_TIME = 9
RESP_CODE_NO_MORE_MESSAGES = 10
RESP_CODE_EXPORT_CONTACT = 11
RESP_CODE_BATT_AND_STORAGE = 12
RESP_CODE_DEVICE_INFO = 13 # CMD_DEVICE_QUERY response
RESP_CODE_PRIVATE_KEY = 14
RESP_CODE_DISABLED = 15
RESP_CODE_CONTACT_MSG_RECV_V3 = 16
RESP_CODE_CHANNEL_MSG_RECV_V3 = 17
RESP_CODE_CHANNEL_INFO = 18
RESP_CODE_SIGN_START = 19
RESP_CODE_SIGNATURE = 20
RESP_CODE_CUSTOM_VARS = 21
RESP_CODE_ADVERT_PATH = 22
RESP_CODE_TUNING_PARAMS = 23
RESP_CODE_STATS = 24
RESP_CODE_AUTOADD_CONFIG = 25
# Push codes (radio -> app, unsolicited)
PUSH_CODE_ADVERT = 0x80
PUSH_CODE_PATH_UPDATED = 0x81
PUSH_CODE_SEND_CONFIRMED = 0x82
PUSH_CODE_MSG_WAITING = 0x83
PUSH_CODE_RAW_DATA = 0x84
PUSH_CODE_LOGIN_SUCCESS = 0x85
PUSH_CODE_LOGIN_FAIL = 0x86
PUSH_CODE_STATUS_RESPONSE = 0x87
PUSH_CODE_LOG_RX_DATA = 0x88
PUSH_CODE_TRACE_DATA = 0x89
PUSH_CODE_NEW_ADVERT = 0x8A
PUSH_CODE_TELEMETRY_RESPONSE = 0x8B
PUSH_CODE_BINARY_RESPONSE = 0x8C
PUSH_CODE_PATH_DISCOVERY_RESPONSE = 0x8D
PUSH_CODE_CONTROL_DATA = 0x8E
PUSH_CODE_CONTACT_DELETED = 0x8F
PUSH_CODE_CONTACTS_FULL = 0x90
# Error codes
ERR_CODE_UNSUPPORTED_CMD = 1
ERR_CODE_NOT_FOUND = 2
ERR_CODE_TABLE_FULL = 3
ERR_CODE_BAD_STATE = 4
ERR_CODE_FILE_IO_ERROR = 5
ERR_CODE_ILLEGAL_ARG = 6
# Stats sub-types
STATS_TYPE_CORE = 0
STATS_TYPE_RADIO = 1
STATS_TYPE_PACKETS = 2
# Frame delimiters (USB/TCP: > = outbound, < = inbound)
FRAME_OUTBOUND_PREFIX = 0x3E # '>'
FRAME_INBOUND_PREFIX = 0x3C # '<'
MAX_FRAME_SIZE = 512
PUB_KEY_SIZE = 32
MAX_PATH_SIZE = 64
# ADV types
ADV_TYPE_CHAT = 1
ADV_TYPE_REPEATER = 2
ADV_TYPE_ROOM = 3
ADV_TYPE_SENSOR = 4
# Default Public channel PSK (from firmware)
PUBLIC_GROUP_PSK = b"izOH6cXN6mrJ5e26oRXNcg=="
# Default public channel secret (hex) - used for channel 0 when no channels loaded
# Matches MeshCore firmware default for new radios
DEFAULT_PUBLIC_CHANNEL_SECRET = bytes.fromhex("8b3387e9c5cdea6ac9e5edbaa115cd72")
+729
View File
@@ -0,0 +1,729 @@
"""
Companion frame protocol TCP server.
Implements the MeshCore Companion Radio Protocol over TCP for standard clients.
Frame format: outbound '>' + 2-byte len (LE) + data; inbound '<' + 2-byte len + data.
"""
import asyncio
import base64
import logging
import struct
import time
from typing import Optional
from pymc_core.companion.constants import ADV_TYPE_CHAT
from pymc_core.companion.models import Contact, QueuedMessage
from .constants import (
RESP_CODE_DEVICE_INFO,
CMD_ADD_UPDATE_CONTACT,
CMD_GET_CHANNEL,
CMD_SET_CHANNEL,
CMD_SET_FLOOD_SCOPE,
CMD_APP_START,
CMD_DEVICE_QUERY,
CMD_GET_BATT_AND_STORAGE,
CMD_GET_CONTACTS,
CMD_GET_STATS,
CMD_IMPORT_CONTACT,
CMD_REMOVE_CONTACT,
CMD_RESET_PATH,
CMD_SEND_CHANNEL_TXT_MSG,
CMD_SEND_LOGIN,
CMD_SEND_SELF_ADVERT,
CMD_SEND_TXT_MSG,
CMD_SET_ADVERT_LATLON,
CMD_SET_ADVERT_NAME,
CMD_SYNC_NEXT_MESSAGE,
ERR_CODE_BAD_STATE,
ERR_CODE_ILLEGAL_ARG,
ERR_CODE_NOT_FOUND,
ERR_CODE_UNSUPPORTED_CMD,
FRAME_INBOUND_PREFIX,
FRAME_OUTBOUND_PREFIX,
MAX_FRAME_SIZE,
MAX_PATH_SIZE,
PUB_KEY_SIZE,
PUSH_CODE_ADVERT,
PUSH_CODE_MSG_WAITING,
PUSH_CODE_PATH_UPDATED,
PUSH_CODE_SEND_CONFIRMED,
RESP_CODE_BATT_AND_STORAGE,
RESP_CODE_CHANNEL_INFO,
RESP_CODE_CHANNEL_MSG_RECV,
RESP_CODE_CHANNEL_MSG_RECV_V3,
RESP_CODE_CONTACT,
RESP_CODE_CONTACT_MSG_RECV_V3,
RESP_CODE_CONTACT_MSG_RECV,
RESP_CODE_CONTACTS_START,
RESP_CODE_END_OF_CONTACTS,
RESP_CODE_ERR,
RESP_CODE_NO_MORE_MESSAGES,
RESP_CODE_OK,
RESP_CODE_SELF_INFO,
RESP_CODE_SENT,
RESP_CODE_STATS,
STATS_TYPE_CORE,
STATS_TYPE_PACKETS,
STATS_TYPE_RADIO,
)
logger = logging.getLogger("CompanionFrameServer")
class CompanionFrameServer:
"""TCP server for the MeshCore companion frame protocol.
One client per companion at a time. Listens on configured port.
"""
def __init__(
self,
bridge,
companion_hash: str,
port: int = 5000,
bind_address: str = "0.0.0.0",
sqlite_handler=None,
):
self.bridge = bridge
self.companion_hash = companion_hash
self.port = port
self.bind_address = bind_address
self.sqlite_handler = sqlite_handler
self._server: Optional[asyncio.Server] = None
self._client_writer: Optional[asyncio.StreamWriter] = None
self._client_reader: Optional[asyncio.StreamReader] = None
self._app_target_ver = 0
async def start(self) -> None:
"""Start the TCP server."""
self._server = await asyncio.start_server(
self._handle_client,
self.bind_address,
self.port,
)
addr = self._server.sockets[0].getsockname() if self._server.sockets else (self.bind_address, self.port)
logger.info(f"Companion frame server listening on {addr[0]}:{addr[1]} (hash=0x{int(self.companion_hash):02x})")
async def stop(self) -> None:
"""Stop the TCP server and disconnect any client."""
if self._client_writer:
try:
self._client_writer.close()
await self._client_writer.wait_closed()
except Exception:
pass
self._client_writer = None
self._client_reader = None
if self._server:
self._server.close()
await self._server.wait_closed()
self._server = None
logger.info(f"Companion frame server stopped (port={self.port})")
def _setup_push_callbacks(self) -> None:
"""Subscribe to bridge events and send PUSH frames to connected client."""
def _write_push(data: bytes) -> None:
if self._client_writer and not self._client_writer.is_closing():
try:
frame = bytes([FRAME_OUTBOUND_PREFIX]) + struct.pack("<H", len(data)) + data
self._client_writer.write(frame)
asyncio.create_task(self._drain_writer())
except Exception as e:
logger.debug(f"Push write error: {e}")
async def on_message_received(sender_key, text, timestamp, txt_type):
if self.sqlite_handler:
self.sqlite_handler.companion_push_message(
self.companion_hash,
{
"sender_key": sender_key,
"text": text,
"timestamp": timestamp,
"txt_type": txt_type,
"is_channel": False,
"channel_idx": 0,
"path_len": 0,
},
)
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
async def on_send_confirmed(crc):
data = struct.pack("<B4sI", PUSH_CODE_SEND_CONFIRMED, struct.pack("<I", crc)[:4], 0)
_write_push(data)
async def on_advert_received(contact):
if isinstance(contact, dict):
pubkey = contact.get("public_key", b"")
if isinstance(pubkey, str):
pubkey = bytes.fromhex(pubkey)
else:
pubkey = getattr(contact, "public_key", getattr(contact, "pub_key", b""))
if len(pubkey) >= 32:
_write_push(bytes([PUSH_CODE_ADVERT]) + pubkey[:32])
async def on_contact_path_updated(pub_key, path_len, path):
if isinstance(pub_key, bytes) and len(pub_key) >= 32:
_write_push(bytes([PUSH_CODE_PATH_UPDATED]) + pub_key[:32])
async def on_channel_message_received(channel_name, sender_name, message_text, timestamp, path_len=0):
channel_idx = 0
max_ch = getattr(
getattr(self.bridge, "channels", None), "max_channels", 40
)
for idx in range(max_ch):
ch = self.bridge.get_channel(idx)
if ch and ch.name == channel_name:
channel_idx = idx
break
if self.sqlite_handler:
self.sqlite_handler.companion_push_message(
self.companion_hash,
{
"sender_key": b"",
"text": message_text,
"timestamp": timestamp,
"txt_type": 0,
"is_channel": True,
"channel_idx": channel_idx,
"path_len": path_len,
},
)
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
self.bridge.on_message_received(on_message_received)
self.bridge.on_channel_message_received(on_channel_message_received)
self.bridge.on_send_confirmed(on_send_confirmed)
self.bridge.on_advert_received(on_advert_received)
self.bridge.on_contact_path_updated(on_contact_path_updated)
async def _drain_writer(self) -> None:
if self._client_writer:
try:
await self._client_writer.drain()
except Exception:
pass
def _write_frame(self, data: bytes) -> None:
"""Send a frame to the connected client (outbound format)."""
if self._client_writer and not self._client_writer.is_closing():
frame = bytes([FRAME_OUTBOUND_PREFIX]) + struct.pack("<H", len(data)) + data
self._client_writer.write(frame)
def _write_ok(self) -> None:
self._write_frame(bytes([RESP_CODE_OK]))
def _write_err(self, err_code: int) -> None:
self._write_frame(bytes([RESP_CODE_ERR, err_code]))
def _save_contacts(self) -> None:
"""Persist contacts to SQLite."""
if not self.sqlite_handler:
return
contacts = self.bridge.get_contacts()
dicts = []
for c in contacts:
pk = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key)
dicts.append({
"pubkey": pk,
"name": c.name,
"adv_type": c.adv_type,
"flags": c.flags,
"out_path_len": c.out_path_len,
"out_path": c.out_path if isinstance(c.out_path, bytes) else (bytes.fromhex(c.out_path) if c.out_path else b""),
"last_advert_timestamp": c.last_advert_timestamp,
"lastmod": c.lastmod,
"gps_lat": c.gps_lat,
"gps_lon": c.gps_lon,
"sync_since": c.sync_since,
})
self.sqlite_handler.companion_save_contacts(self.companion_hash, dicts)
async def _handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""Handle a new client connection. One client at a time."""
if self._client_writer:
logger.warning("Companion already has a client; rejecting new connection")
writer.close()
await writer.wait_closed()
return
self._client_reader = reader
self._client_writer = writer
self._setup_push_callbacks()
logger.info(f"Companion client connected (port={self.port})")
try:
while True:
prefix = await reader.read(1)
if not prefix:
break
if prefix[0] != FRAME_INBOUND_PREFIX:
logger.warning(f"Invalid frame prefix: 0x{prefix[0]:02x}")
continue
len_bytes = await reader.readexactly(2)
frame_len = struct.unpack("<H", len_bytes)[0]
if frame_len > MAX_FRAME_SIZE:
logger.warning(f"Frame too long: {frame_len}")
break
payload = await reader.readexactly(frame_len)
await self._handle_cmd(payload)
except asyncio.IncompleteReadError:
pass
except ConnectionResetError:
pass
except Exception as e:
logger.error(f"Client handler error: {e}", exc_info=True)
finally:
self._client_writer = None
self._client_reader = None
logger.info(f"Companion client disconnected (port={self.port})")
async def _handle_cmd(self, payload: bytes) -> None:
"""Dispatch command to handler."""
if not payload:
return
cmd = payload[0]
data = payload[1:]
if cmd in (CMD_GET_CHANNEL, CMD_SET_CHANNEL):
logger.debug(f"Companion cmd 0x{cmd:02x} ({'GET_CHANNEL' if cmd == CMD_GET_CHANNEL else 'SET_CHANNEL'}), payload_len={len(payload)}")
try:
if cmd == CMD_APP_START:
await self._cmd_app_start(data)
elif cmd == CMD_DEVICE_QUERY:
await self._cmd_device_query(data)
elif cmd == CMD_GET_CONTACTS:
await self._cmd_get_contacts(data)
elif cmd == CMD_SEND_TXT_MSG:
await self._cmd_send_txt_msg(data)
elif cmd == CMD_SEND_CHANNEL_TXT_MSG:
await self._cmd_send_channel_txt_msg(data)
elif cmd == CMD_SYNC_NEXT_MESSAGE:
await self._cmd_sync_next_message(data)
elif cmd == CMD_SEND_LOGIN:
await self._cmd_send_login(data)
elif cmd == CMD_SEND_SELF_ADVERT:
await self._cmd_send_self_advert(data)
elif cmd == CMD_SET_ADVERT_NAME:
await self._cmd_set_advert_name(data)
elif cmd == CMD_SET_ADVERT_LATLON:
await self._cmd_set_advert_latlon(data)
elif cmd == CMD_ADD_UPDATE_CONTACT:
await self._cmd_add_update_contact(data)
elif cmd == CMD_REMOVE_CONTACT:
await self._cmd_remove_contact(data)
elif cmd == CMD_RESET_PATH:
await self._cmd_reset_path(data)
elif cmd == CMD_GET_BATT_AND_STORAGE:
await self._cmd_get_batt_and_storage(data)
elif cmd == CMD_GET_STATS:
await self._cmd_get_stats(data)
elif cmd == CMD_IMPORT_CONTACT:
await self._cmd_import_contact(data)
elif cmd == CMD_GET_CHANNEL:
await self._cmd_get_channel(data)
elif cmd == CMD_SET_CHANNEL:
await self._cmd_set_channel(data)
elif cmd == CMD_SET_FLOOD_SCOPE:
# App sends this on connect; no-op for repeater companion (no radio scope)
self._write_ok()
else:
self._write_err(ERR_CODE_UNSUPPORTED_CMD)
except Exception as e:
logger.error(f"Cmd 0x{cmd:02x} error: {e}", exc_info=True)
self._write_err(ERR_CODE_ILLEGAL_ARG)
async def _cmd_app_start(self, data: bytes) -> None:
if len(data) >= 1:
self._app_target_ver = data[0]
# RESP_CODE_SELF_INFO - name is varchar (remainder of frame)
# Send name without null terminator; client displays remainder of frame as-is
prefs = self.bridge.get_self_info()
pubkey = self.bridge.get_public_key()
name = prefs.node_name.encode("utf-8", errors="replace")
lat = int(getattr(prefs, "latitude", 0) * 1e6)
lon = int(getattr(prefs, "longitude", 0) * 1e6)
frame = (
bytes([RESP_CODE_SELF_INFO, ADV_TYPE_CHAT, prefs.tx_power_dbm, 22])
+ pubkey
+ struct.pack("<ii", lat, lon)
+ bytes([getattr(prefs, "multi_acks", 0), getattr(prefs, "advert_loc_policy", 0)])
+ bytes([getattr(prefs, "telemetry_mode_base", 0) | (getattr(prefs, "telemetry_mode_location", 0) << 2)])
+ bytes([getattr(prefs, "manual_add_contacts", 0)])
+ struct.pack(
"<II",
prefs.frequency_hz // 1000, # radio_freq: freq * 1000 (e.g. 915 MHz → 915000)
prefs.bandwidth_hz, # radio_bw: bandwidth(kHz) * 1000 (e.g. 125 kHz → 125000)
)
+ bytes([prefs.spreading_factor, prefs.coding_rate])
+ name
)
self._write_frame(frame)
async def _cmd_device_query(self, data: bytes) -> None:
if len(data) >= 1:
self._app_target_ver = data[0]
firmware_ver = 8
# Protocol: max_contacts_div_2 and max_channels are bytes (ver 3+)
max_contacts = getattr(
getattr(self.bridge, "contacts", None), "max_contacts", 1000
)
max_channels_val = getattr(
getattr(self.bridge, "channels", None), "max_channels", 40
)
max_contacts_div_2 = min(max_contacts // 2, 255)
max_channels = min(max_channels_val, 255)
ble_pin = 0
build_date = b"13 Feb 2026\x00"[:12].ljust(12, b"\x00")
model = b"pyMC-Repeater-Companion\x00"[:40].ljust(40, b"\x00")
version = b"1.0.0\x00"[:20].ljust(20, b"\x00")
frame = (
bytes([RESP_CODE_DEVICE_INFO, firmware_ver, max_contacts_div_2, max_channels])
+ struct.pack("<I", ble_pin)
+ build_date
+ model
+ version
)
self._write_frame(frame)
async def _cmd_get_contacts(self, data: bytes) -> None:
since = struct.unpack("<I", data[:4])[0] if len(data) >= 4 else 0
contacts = self.bridge.get_contacts(since=since)
self._write_frame(bytes([RESP_CODE_CONTACTS_START]) + struct.pack("<I", len(contacts)))
for c in contacts:
pubkey = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key)
name = (c.name.encode("utf-8")[:32] if isinstance(c.name, str) else c.name[:32]).ljust(32, b"\x00")
# out_path_len is signed byte: -1 (unknown) -> 0xFF, else 0-255
opl = c.out_path_len if hasattr(c, "out_path_len") else -1
opl_byte = 0xFF if opl < 0 else min(opl, 255)
frame = (
bytes([RESP_CODE_CONTACT])
+ pubkey
+ bytes([c.adv_type if hasattr(c, "adv_type") else 0, c.flags if hasattr(c, "flags") else 0])
+ bytes([opl_byte])
+ (c.out_path[:MAX_PATH_SIZE] if hasattr(c, "out_path") and c.out_path else b"").ljust(MAX_PATH_SIZE, b"\x00")
+ name
+ struct.pack("<I", c.last_advert_timestamp if hasattr(c, "last_advert_timestamp") else 0)
+ struct.pack("<i", int((c.gps_lat if hasattr(c, "gps_lat") else 0) * 1e6))
+ struct.pack("<i", int((c.gps_lon if hasattr(c, "gps_lon") else 0) * 1e6))
+ struct.pack("<I", c.lastmod if hasattr(c, "lastmod") else 0)
)
self._write_frame(frame)
most_recent = max((c.lastmod for c in contacts), default=0)
self._write_frame(bytes([RESP_CODE_END_OF_CONTACTS]) + struct.pack("<I", most_recent))
async def _cmd_send_txt_msg(self, data: bytes) -> None:
if len(data) < 12:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
txt_type = data[0]
attempt = data[1]
sender_ts = struct.unpack_from("<I", data, 2)[0]
pubkey_prefix = data[6:12]
text = data[12:].decode("utf-8", errors="replace").rstrip("\x00")
contact = self.bridge.contacts.get_by_key_prefix(pubkey_prefix) if hasattr(self.bridge.contacts, "get_by_key_prefix") else None
if not contact:
for c in self.bridge.get_contacts():
pk = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key)
if pk[:6] == pubkey_prefix:
contact = c
break
if not contact:
self._write_err(ERR_CODE_NOT_FOUND)
return
pubkey = contact.public_key if isinstance(contact.public_key, bytes) else bytes.fromhex(contact.public_key)
result = await self.bridge.send_text_message(pubkey, text, txt_type=txt_type, attempt=attempt + 1)
if result.success:
ack = result.expected_ack or 0
timeout = result.timeout_ms or 5000
frame = bytes([RESP_CODE_SENT, 1 if result.is_flood else 0]) + struct.pack("<II", ack, timeout)
self._write_frame(frame)
else:
self._write_err(ERR_CODE_BAD_STATE)
async def _cmd_send_channel_txt_msg(self, data: bytes) -> None:
if len(data) < 6:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
# Protocol: txt_type(1) + channel_idx(1) + sender_timestamp(4) + text
channel_idx = data[1]
sender_ts = struct.unpack_from("<I", data, 2)[0]
text = data[6:].decode("utf-8", errors="replace").rstrip("\x00")
ok = await self.bridge.send_channel_message(channel_idx, text)
self._write_ok() if ok else self._write_err(ERR_CODE_BAD_STATE)
async def _cmd_sync_next_message(self, data: bytes) -> None:
msg = self.bridge.sync_next_message()
if msg is None and self.sqlite_handler:
msg_dict = self.sqlite_handler.companion_pop_message(self.companion_hash)
if msg_dict:
msg = QueuedMessage(
sender_key=msg_dict.get("sender_key", b""),
txt_type=msg_dict.get("txt_type", 0),
timestamp=msg_dict.get("timestamp", 0),
text=msg_dict.get("text", ""),
is_channel=bool(msg_dict.get("is_channel", False)),
channel_idx=msg_dict.get("channel_idx", 0),
path_len=msg_dict.get("path_len", 0),
)
if msg is None:
self._write_frame(bytes([RESP_CODE_NO_MORE_MESSAGES]))
return
if msg.is_channel:
path_len_byte = msg.path_len if msg.path_len < 256 else 0xFF
txt_type = 0 # TXT_TYPE_PLAIN
text_bytes = msg.text.encode("utf-8", errors="replace")
if self._app_target_ver >= 3:
# RESP_CODE_CHANNEL_MSG_RECV_V3: code + snr + reserved(2) + channel_idx + path_len + txt_type + timestamp + text
frame = bytes([
RESP_CODE_CHANNEL_MSG_RECV_V3,
0, # snr (we don't have it when popping from queue)
0, 0, # reserved
msg.channel_idx,
path_len_byte,
txt_type,
]) + struct.pack("<I", msg.timestamp) + text_bytes
else:
frame = bytes([RESP_CODE_CHANNEL_MSG_RECV, msg.channel_idx, path_len_byte, txt_type])
frame += struct.pack("<I", msg.timestamp) + text_bytes
else:
prefix = msg.sender_key[:6] if len(msg.sender_key) >= 6 else msg.sender_key.ljust(6, b"\x00")
path_len_byte = msg.path_len if msg.path_len < 256 else 0xFF
text_bytes = msg.text.encode("utf-8", errors="replace")
if self._app_target_ver >= 3:
frame = bytes([
RESP_CODE_CONTACT_MSG_RECV_V3,
0, 0, 0, # snr + reserved
]) + prefix + bytes([path_len_byte, msg.txt_type]) + struct.pack("<I", msg.timestamp) + text_bytes
else:
frame = bytes([RESP_CODE_CONTACT_MSG_RECV]) + prefix + bytes([path_len_byte, msg.txt_type])
frame += struct.pack("<I", msg.timestamp) + text_bytes
self._write_frame(frame)
async def _cmd_send_login(self, data: bytes) -> None:
if len(data) < 33:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[:32]
password = data[32:].decode("utf-8", errors="replace").rstrip("\x00")
self._write_frame(bytes([RESP_CODE_SENT, 1]) + struct.pack("<II", 0, 10000))
result = await self.bridge.send_login(pubkey, password)
if result.get("success"):
self._write_frame(
bytes([PUSH_CODE_LOGIN_SUCCESS, 1 if result.get("is_admin") else 0])
+ pubkey[:6]
+ struct.pack("<I", result.get("tag", 0))
+ bytes([result.get("acl_permissions", 0)])
)
else:
self._write_frame(bytes([PUSH_CODE_LOGIN_FAIL, 0]) + pubkey[:6])
async def _cmd_send_self_advert(self, data: bytes) -> None:
flood = len(data) >= 1 and data[0] == 1
ok = await self.bridge.advertise(flood=flood)
self._write_ok() if ok else self._write_err(ERR_CODE_BAD_STATE)
async def _cmd_set_advert_name(self, data: bytes) -> None:
name = data.decode("utf-8", errors="replace").rstrip("\x00")
self.bridge.set_advert_name(name)
self._write_ok()
async def _cmd_set_advert_latlon(self, data: bytes) -> None:
if len(data) < 8:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
lat, lon = struct.unpack_from("<ii", data, 0)
self.bridge.set_advert_latlon(lat / 1e6, lon / 1e6)
self._write_ok()
async def _cmd_add_update_contact(self, data: bytes) -> None:
if len(data) < 73:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[0:32]
adv_type = data[32]
flags = data[33]
out_path_len = data[34]
out_path = data[35:35 + MAX_PATH_SIZE]
name_raw = data[35 + MAX_PATH_SIZE:35 + MAX_PATH_SIZE + 32]
name = name_raw.split(b"\x00")[0].decode("utf-8", errors="replace")
last_advert = struct.unpack_from("<I", data, 35 + MAX_PATH_SIZE + 32)[0]
gps_lat, gps_lon = 0.0, 0.0
if len(data) >= 35 + MAX_PATH_SIZE + 32 + 12:
gps_lat = struct.unpack_from("<i", data, 35 + MAX_PATH_SIZE + 32 + 4)[0] / 1e6
gps_lon = struct.unpack_from("<i", data, 35 + MAX_PATH_SIZE + 32 + 8)[0] / 1e6
contact = Contact(
public_key=pubkey,
name=name,
adv_type=adv_type,
flags=flags,
out_path_len=out_path_len,
out_path=out_path.rstrip(b"\x00"),
last_advert_timestamp=last_advert,
lastmod=int(time.time()),
gps_lat=gps_lat,
gps_lon=gps_lon,
)
ok = self.bridge.add_update_contact(contact)
if ok and self.sqlite_handler:
self._save_contacts()
self._write_ok() if ok else self._write_err(ERR_CODE_TABLE_FULL)
async def _cmd_remove_contact(self, data: bytes) -> None:
if len(data) < 32:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[:32]
ok = self.bridge.remove_contact(pubkey)
if ok and self.sqlite_handler:
self._save_contacts()
self._write_ok() if ok else self._write_err(ERR_CODE_NOT_FOUND)
async def _cmd_reset_path(self, data: bytes) -> None:
if len(data) < 32:
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
pubkey = data[:32]
ok = self.bridge.reset_path(pubkey)
self._write_ok() if ok else self._write_err(ERR_CODE_NOT_FOUND)
async def _cmd_get_batt_and_storage(self, data: bytes) -> None:
millivolts = 0
used_kb = 0
total_kb = 0
frame = bytes([RESP_CODE_BATT_AND_STORAGE]) + struct.pack("<H", millivolts) + struct.pack("<II", used_kb, total_kb)
self._write_frame(frame)
async def _cmd_get_stats(self, data: bytes) -> None:
stats_type = data[0] if len(data) >= 1 else STATS_TYPE_PACKETS
stats = self.bridge.get_stats(stats_type)
frame = bytes([RESP_CODE_STATS, stats_type])
if stats_type == STATS_TYPE_CORE:
frame += struct.pack("<h", 0)
frame += struct.pack("<I", int(stats.get("uptime_secs", 0)))
frame += struct.pack("<H", 0)
frame += bytes([stats.get("queue_len", 0)])
elif stats_type == STATS_TYPE_RADIO:
frame += struct.pack("<h", stats.get("last_rssi", 0) or 0)
frame += bytes([stats.get("last_rssi", 0) or 0, int((stats.get("last_snr", 0) or 0) * 4)])
frame += struct.pack("<II", 0, 0)
else:
frame += struct.pack("<II", stats.get("num_recv_packets", 0), stats.get("num_sent_packets", 0))
frame += struct.pack("<II", stats.get("num_sent_flood", 0), stats.get("num_sent_direct", 0))
frame += struct.pack("<II", stats.get("num_recv_flood", 0), stats.get("num_recv_direct", 0))
self._write_frame(frame)
async def _cmd_import_contact(self, data: bytes) -> None:
ok = self.bridge.import_contact(data)
self._write_ok() if ok else self._write_err(ERR_CODE_ILLEGAL_ARG)
async def _cmd_get_channel(self, data: bytes) -> None:
# Payload: channel index (1 byte), or empty for "get full list" (some apps send
# one request with no payload to receive all channels in one go).
channel_idx = data[0] if len(data) >= 1 else 0
get_full_list = len(data) == 0
max_channels_val = getattr(
getattr(self.bridge, "channels", None), "max_channels", 40
)
logger.debug(
f"CMD_GET_CHANNEL: idx={channel_idx}, data_len={len(data)}, get_full_list={get_full_list}"
)
def _channel_info_frame(idx: int, ch, include_idx: bool = True) -> bytes:
if ch is None:
name = b"\x00" * 32
secret = b"\x00" * 32
else:
name = ch.name.encode("utf-8", errors="replace")[:32].ljust(
32, b"\x00"
)
secret = (
ch.secret[:32].ljust(32, b"\x00") if ch.secret else b"\x00" * 32
)
if include_idx:
return bytes([RESP_CODE_CHANNEL_INFO, idx]) + name + secret
# Some apps expect code + name(32) + secret(32) only (no index byte)
return bytes([RESP_CODE_CHANNEL_INFO]) + name + secret
if get_full_list:
for idx in range(max_channels_val):
ch = self.bridge.get_channel(idx)
frame = _channel_info_frame(idx, ch, include_idx=True)
self._write_frame(frame)
logger.debug(f"CMD_GET_CHANNEL: sent full list ({max_channels_val} slots)")
return
if channel_idx < 0 or channel_idx >= max_channels_val:
logger.debug(f"CMD_GET_CHANNEL: channel {channel_idx} out of range")
self._write_err(ERR_CODE_NOT_FOUND)
return
ch = self.bridge.get_channel(channel_idx)
if ch is None:
logger.debug(f"CMD_GET_CHANNEL: returning empty slot {channel_idx}")
else:
logger.debug(f"CMD_GET_CHANNEL: returning {ch.name!r}, secret_len=32")
# Send code + name(32) + secret(32) without channel_idx so the app sees
# name at offset 1 (avoids app treating channel_idx as first byte of name).
frame = _channel_info_frame(channel_idx, ch, include_idx=False)
self._write_frame(frame)
async def _cmd_set_channel(self, data: bytes) -> None:
# MeshCore format: channel_idx(1) + name(32) + secret(32) or secret_hex(64)
logger.debug(f"CMD_SET_CHANNEL: data_len={len(data)}, data_hex={data[:50].hex()}...")
if len(data) < 34: # minimum: idx + name(32) + at least 1 byte secret
logger.debug(f"CMD_SET_CHANNEL: rejected (len {len(data)} < 34)")
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
channel_idx = data[0]
name_raw = data[1:33]
name = name_raw.split(b"\x00")[0].decode("utf-8", errors="replace").strip()
if len(data) >= 97:
# Hex secret: 64 hex chars = 32 bytes
try:
secret = bytes.fromhex(data[33:97].decode("ascii"))
logger.debug(f"CMD_SET_CHANNEL: parsed hex secret, len={len(secret)}")
except (ValueError, UnicodeDecodeError) as e:
logger.debug(f"CMD_SET_CHANNEL: hex secret parse failed: {e}")
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
elif len(data) >= 65:
# Binary secret: 32 bytes (MeshCore DataStore format)
secret = data[33:65]
logger.debug(f"CMD_SET_CHANNEL: parsed 32-byte binary secret")
elif len(data) >= 49:
# Legacy: 16-byte binary secret
secret = data[33:49]
logger.debug(f"CMD_SET_CHANNEL: parsed 16-byte binary secret")
else:
logger.debug(f"CMD_SET_CHANNEL: rejected (len {len(data)} not in 49/65/97)")
self._write_err(ERR_CODE_ILLEGAL_ARG)
return
logger.debug(f"CMD_SET_CHANNEL: idx={channel_idx}, name={name!r}, secret_len={len(secret)}")
ok = self.bridge.set_channel(channel_idx, name, secret)
if ok and self.sqlite_handler:
self._save_channels()
logger.debug(f"CMD_SET_CHANNEL: set_channel ok={ok}")
self._write_ok() if ok else self._write_err(ERR_CODE_TABLE_FULL)
def _save_channels(self) -> None:
"""Persist channels to SQLite."""
if not self.sqlite_handler:
return
channels = []
max_ch = getattr(
getattr(self.bridge, "channels", None), "max_channels", 40
)
for idx in range(max_ch):
ch = self.bridge.get_channel(idx)
if ch is not None:
channels.append({
"channel_idx": idx,
"name": ch.name,
"secret": ch.secret,
})
self.sqlite_handler.companion_save_channels(self.companion_hash, channels)
+21 -15
View File
@@ -1,7 +1,9 @@
from __future__ import annotations
import logging
import os
import yaml
from typing import Optional, Dict, Any, List
from typing import Any, Dict, List, Optional
logger = logging.getLogger("ConfigManager")
@@ -22,32 +24,35 @@ class ConfigManager:
self.config = config
self.daemon = daemon_instance
def save_to_file(self) -> bool:
def save_to_file(self) -> tuple[bool, str]:
"""
Save current config to YAML file.
Returns:
True if successful, False otherwise
(True, "") if successful, (False, error_message) otherwise
"""
try:
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
dirpath = os.path.dirname(self.config_path)
if dirpath:
os.makedirs(dirpath, exist_ok=True)
with open(self.config_path, 'w') as f:
# Use safe_dump with explicit width to prevent line wrapping
# Setting width to a very large number prevents truncation of long strings like identity keys
yaml.safe_dump(
self.config,
f,
default_flow_style=False,
indent=2,
self.config,
f,
default_flow_style=False,
indent=2,
width=1000000, # Very large width to prevent any line wrapping
sort_keys=False,
allow_unicode=True
)
logger.info(f"Configuration saved to {self.config_path}")
return True
return True, ""
except Exception as e:
logger.error(f"Failed to save config to {self.config_path}: {e}", exc_info=True)
return False
msg = f"Failed to save config to {self.config_path}: {e}"
logger.error(msg, exc_info=True)
return False, str(e)
def live_update_daemon(self, sections: Optional[List[str]] = None) -> bool:
"""
@@ -140,10 +145,11 @@ class ConfigManager:
self.config[section] = values
# Save to file
result["saved"] = self.save_to_file()
saved, err = self.save_to_file()
result["saved"] = saved
if not result["saved"]:
result["error"] = "Failed to save config to file"
result["error"] = err or "Failed to save config to file"
return result
# Live update daemon if requested
+216 -2
View File
@@ -249,9 +249,75 @@ class SQLiteHandler:
(migration_name, time.time())
)
logger.info(f"Migration '{migration_name}' applied successfully")
# Migration 4: Add companion tables for companion identity persistence
migration_name = "add_companion_tables"
existing = conn.execute(
"SELECT migration_name FROM migrations WHERE migration_name = ?",
(migration_name,)
).fetchone()
if not existing:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='companion_contacts'"
)
if not cursor.fetchone():
conn.execute("""
CREATE TABLE companion_contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
companion_hash TEXT NOT NULL,
pubkey BLOB NOT NULL,
name TEXT NOT NULL,
adv_type INTEGER NOT NULL DEFAULT 0,
flags INTEGER NOT NULL DEFAULT 0,
out_path_len INTEGER NOT NULL DEFAULT -1,
out_path BLOB,
last_advert_timestamp INTEGER NOT NULL DEFAULT 0,
lastmod INTEGER NOT NULL DEFAULT 0,
gps_lat REAL NOT NULL DEFAULT 0,
gps_lon REAL NOT NULL DEFAULT 0,
sync_since INTEGER NOT NULL DEFAULT 0,
updated_at REAL NOT NULL
)
""")
conn.execute("""
CREATE TABLE companion_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
companion_hash TEXT NOT NULL,
channel_idx INTEGER NOT NULL,
name TEXT NOT NULL,
secret BLOB NOT NULL,
updated_at REAL NOT NULL
)
""")
conn.execute("""
CREATE TABLE companion_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
companion_hash TEXT NOT NULL,
sender_key BLOB NOT NULL,
txt_type INTEGER NOT NULL DEFAULT 0,
timestamp INTEGER NOT NULL DEFAULT 0,
text TEXT NOT NULL,
is_channel INTEGER NOT NULL DEFAULT 0,
channel_idx INTEGER NOT NULL DEFAULT 0,
path_len INTEGER NOT NULL DEFAULT 0,
created_at REAL NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_contacts_hash ON companion_contacts(companion_hash)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_contacts_pubkey ON companion_contacts(companion_hash, pubkey)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_channels_hash ON companion_channels(companion_hash)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_messages_hash ON companion_messages(companion_hash)")
logger.info("Created companion_contacts, companion_channels, companion_messages tables")
conn.execute(
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
(migration_name, time.time())
)
logger.info(f"Migration '{migration_name}' applied successfully")
conn.commit()
except Exception as e:
logger.error(f"Failed to run migrations: {e}")
@@ -1344,3 +1410,151 @@ class SQLiteHandler:
except Exception as e:
logger.error(f"Failed to cleanup old messages: {e}")
return 0
# Companion persistence methods
def companion_load_contacts(self, companion_hash: str) -> List[Dict]:
"""Load contacts for a companion from storage."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT pubkey, name, adv_type, flags, out_path_len, out_path,
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since
FROM companion_contacts WHERE companion_hash = ?
""", (companion_hash,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Failed to load companion contacts: {e}")
return []
def companion_save_contacts(self, companion_hash: str, contacts: List[Dict]) -> bool:
"""Replace all contacts for a companion in storage."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.execute("DELETE FROM companion_contacts WHERE companion_hash = ?", (companion_hash,))
now = time.time()
for c in contacts:
conn.execute("""
INSERT INTO companion_contacts
(companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path,
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
companion_hash,
c.get("pubkey", b""),
c.get("name", ""),
c.get("adv_type", 0),
c.get("flags", 0),
c.get("out_path_len", -1),
c.get("out_path", b""),
c.get("last_advert_timestamp", 0),
c.get("lastmod", 0),
c.get("gps_lat", 0.0),
c.get("gps_lon", 0.0),
c.get("sync_since", 0),
now,
))
conn.commit()
return True
except Exception as e:
logger.error(f"Failed to save companion contacts: {e}")
return False
def companion_load_channels(self, companion_hash: str) -> List[Dict]:
"""Load channels for a companion from storage."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT channel_idx, name, secret FROM companion_channels
WHERE companion_hash = ? ORDER BY channel_idx
""", (companion_hash,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Failed to load companion channels: {e}")
return []
def companion_save_channels(self, companion_hash: str, channels: List[Dict]) -> bool:
"""Replace all channels for a companion in storage."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.execute("DELETE FROM companion_channels WHERE companion_hash = ?", (companion_hash,))
now = time.time()
for ch in channels:
conn.execute("""
INSERT INTO companion_channels
(companion_hash, channel_idx, name, secret, updated_at)
VALUES (?, ?, ?, ?, ?)
""", (
companion_hash,
ch.get("channel_idx", 0),
ch.get("name", ""),
ch.get("secret", b""),
now,
))
conn.commit()
return True
except Exception as e:
logger.error(f"Failed to save companion channels: {e}")
return False
def companion_load_messages(self, companion_hash: str, limit: int = 100) -> List[Dict]:
"""Load queued messages for a companion (oldest first for queue order)."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len
FROM companion_messages WHERE companion_hash = ?
ORDER BY created_at ASC LIMIT ?
""", (companion_hash, limit))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Failed to load companion messages: {e}")
return []
def companion_push_message(self, companion_hash: str, msg: Dict) -> bool:
"""Append a message to the companion's queue."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.execute("""
INSERT INTO companion_messages
(companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
companion_hash,
msg.get("sender_key", b""),
msg.get("txt_type", 0),
msg.get("timestamp", 0),
msg.get("text", ""),
int(msg.get("is_channel", False)),
msg.get("channel_idx", 0),
msg.get("path_len", 0),
time.time(),
))
conn.commit()
return True
except Exception as e:
logger.error(f"Failed to push companion message: {e}")
return False
def companion_pop_message(self, companion_hash: str) -> Optional[Dict]:
"""Remove and return the oldest message from the companion's queue."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT id, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len
FROM companion_messages WHERE companion_hash = ?
ORDER BY created_at ASC LIMIT 1
""", (companion_hash,))
row = cursor.fetchone()
if not row:
return None
msg = dict(row)
conn.execute("DELETE FROM companion_messages WHERE id = ?", (msg["id"],))
conn.commit()
return {k: v for k, v in msg.items() if k != "id"}
except Exception as e:
logger.error(f"Failed to pop companion message: {e}")
return None
+23 -20
View File
@@ -197,7 +197,10 @@ class MeshCLI:
# Save config and live update
try:
self.config_manager.save_to_file()
saved, err = self.config_manager.save_to_file()
if not saved:
logger.error(f"Failed to save password: {err}")
return f"Error: Failed to save config: {err}"
self.config_manager.live_update_daemon(['security'])
return f"password now: {new_password}"
except Exception as e:
@@ -337,32 +340,32 @@ class MeshCLI:
try:
if key == "af":
self.repeater_config['airtime_factor'] = float(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
elif key == "name":
self.repeater_config['node_name'] = value
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
elif key == "repeat":
disabled = value.lower() == "off"
self.repeater_config['disable_forward'] = disabled
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return f"OK - repeat is now {'OFF' if disabled else 'ON'}"
elif key == "lat":
self.repeater_config['latitude'] = float(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
elif key == "lon":
self.repeater_config['longitude'] = float(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
@@ -379,7 +382,7 @@ class MeshCLI:
self.config['radio']['bandwidth'] = float(radio_parts[1])
self.config['radio']['spreading_factor'] = int(radio_parts[2])
self.config['radio']['coding_rate'] = int(radio_parts[3])
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['radio'])
return "OK - restart repeater to apply"
@@ -387,7 +390,7 @@ class MeshCLI:
if 'radio' not in self.config:
self.config['radio'] = {}
self.config['radio']['frequency'] = float(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['radio'])
return "OK - restart repeater to apply"
@@ -395,7 +398,7 @@ class MeshCLI:
if 'radio' not in self.config:
self.config['radio'] = {}
self.config['radio']['tx_power'] = int(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['radio'])
return "OK"
@@ -403,7 +406,7 @@ class MeshCLI:
if 'security' not in self.config:
self.config['security'] = {}
self.config['security']['guest_password'] = value
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['security'])
return "OK"
@@ -411,7 +414,7 @@ class MeshCLI:
if 'security' not in self.config:
self.config['security'] = {}
self.config['security']['allow_read_only'] = value.lower() == "on"
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['security'])
return "OK"
@@ -420,7 +423,7 @@ class MeshCLI:
if mins > 0 and (mins < 60 or mins > 240):
return "Error: interval range is 60-240 minutes"
self.repeater_config['advert_interval_minutes'] = mins
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
@@ -429,7 +432,7 @@ class MeshCLI:
if (hours > 0 and hours < 3) or hours > 48:
return "Error: interval range is 3-48 hours"
self.repeater_config['flood_advert_interval_hours'] = hours
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
@@ -438,7 +441,7 @@ class MeshCLI:
if max_val > 64:
return "Error: max 64"
self.repeater_config['max_flood_hops'] = max_val
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
@@ -447,7 +450,7 @@ class MeshCLI:
if delay < 0:
return "Error: cannot be negative"
self.repeater_config['rx_delay_base'] = delay
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater', 'delays'])
return "OK"
@@ -456,7 +459,7 @@ class MeshCLI:
if delay < 0:
return "Error: cannot be negative"
self.repeater_config['tx_delay_factor'] = delay
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater', 'delays'])
return "OK"
@@ -465,19 +468,19 @@ class MeshCLI:
if delay < 0:
return "Error: cannot be negative"
self.repeater_config['direct_tx_delay_factor'] = delay
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater', 'delays'])
return "OK"
elif key == "multi.acks":
self.repeater_config['multi_acks'] = int(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
elif key == "int.thresh":
self.repeater_config['interference_threshold'] = int(value)
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return "OK"
@@ -486,7 +489,7 @@ class MeshCLI:
# Round to nearest multiple of 4
rounded = (interval // 4) * 4
self.repeater_config['agc_reset_interval'] = rounded
self.config_manager.save_to_file()
saved, _ = self.config_manager.save_to_file()
self.config_manager.live_update_daemon(['repeater'])
return f"OK - interval rounded to {rounded}"
+147
View File
@@ -36,6 +36,8 @@ class RepeaterDaemon:
self.protocol_request_helper = None
self.acl = None
self.router = None
self.companion_bridges: dict[int, object] = {}
self.companion_frame_servers: list = []
log_level = config.get("logging", {}).get("level", "INFO")
@@ -256,6 +258,9 @@ class RepeaterDaemon:
)
logger.info("Protocol request handler initialized")
# Load companion identities (CompanionBridge + frame server per companion)
await self._load_companion_identities()
except Exception as e:
logger.error(f"Failed to initialize dispatcher: {e}")
raise
@@ -319,6 +324,136 @@ class RepeaterDaemon:
total_identities = len(self.identity_manager.list_identities())
logger.info(f"Identity manager loaded {total_identities} total identities")
async def _load_companion_identities(self) -> None:
"""Load companion identities from config and create CompanionBridge + frame server for each."""
from pymc_core import LocalIdentity
from pymc_core.companion import CompanionBridge
from pymc_core.companion.models import Contact, Channel
from repeater.companion import CompanionFrameServer
companions_config = self.config.get("identities", {}).get("companions") or []
if not companions_config:
return
sqlite_handler = None
if self.repeater_handler and self.repeater_handler.storage:
sqlite_handler = self.repeater_handler.storage.sqlite_handler
radio_config = self.repeater_handler.radio_config if self.repeater_handler else self.config.get("radio", {})
for comp_config in companions_config:
try:
name = comp_config.get("name")
identity_key = comp_config.get("identity_key")
settings = comp_config.get("settings") or {}
if not name or not identity_key:
logger.warning("Skipping companion config: missing name or identity_key")
continue
if isinstance(identity_key, str):
try:
identity_key_bytes = bytes.fromhex(identity_key)
except ValueError as e:
logger.error(f"Companion '{name}' identity_key invalid hex: {e}")
continue
elif isinstance(identity_key, bytes):
identity_key_bytes = identity_key
else:
logger.error(f"Companion '{name}' identity_key has unknown type")
continue
if len(identity_key_bytes) not in (32, 64):
logger.error(
f"Companion '{name}' identity_key must be 32 bytes (hex) or 64 bytes (MeshCore firmware key)"
)
continue
identity = LocalIdentity(seed=identity_key_bytes)
pubkey = identity.get_public_key()
companion_hash = pubkey[0]
companion_hash_str = f"{companion_hash:02x}"
node_name = settings.get("node_name", name)
tcp_port = settings.get("tcp_port", 5000)
bind_address = settings.get("bind_address", "0.0.0.0")
bridge = CompanionBridge(
identity=identity,
packet_injector=self.router.inject_packet,
node_name=node_name,
radio_config=radio_config,
)
# Load contacts from SQLite
if sqlite_handler:
contact_rows = sqlite_handler.companion_load_contacts(companion_hash_str)
if contact_rows:
records = []
for row in contact_rows:
d = dict(row)
d["public_key"] = d.pop("pubkey", d.get("public_key", b""))
records.append(d)
bridge.contacts.load_from_dicts(records)
# Load channels from SQLite
channel_rows = sqlite_handler.companion_load_channels(companion_hash_str)
for row in channel_rows:
ch = Channel(
name=row.get("name", ""),
secret=row.get("secret", b"") if isinstance(row.get("secret"), bytes) else (bytes.fromhex(row.get("secret", "")) if row.get("secret") else b""),
)
bridge.channels.set(row.get("channel_idx", 0), ch)
# Preload queued messages from SQLite into bridge
for msg_dict in sqlite_handler.companion_load_messages(companion_hash_str):
from pymc_core.companion.models import QueuedMessage
sk = msg_dict.get("sender_key", b"")
if isinstance(sk, str):
sk = bytes.fromhex(sk)
bridge.message_queue.push(QueuedMessage(
sender_key=sk,
txt_type=msg_dict.get("txt_type", 0),
timestamp=msg_dict.get("timestamp", 0),
text=msg_dict.get("text", ""),
is_channel=bool(msg_dict.get("is_channel", False)),
channel_idx=msg_dict.get("channel_idx", 0),
path_len=msg_dict.get("path_len", 0),
))
# Ensure public channel (0) exists with default key for new companions
from repeater.companion.constants import DEFAULT_PUBLIC_CHANNEL_SECRET
if bridge.get_channel(0) is None:
bridge.set_channel(0, "Public", DEFAULT_PUBLIC_CHANNEL_SECRET)
self.companion_bridges[companion_hash] = bridge
frame_server = CompanionFrameServer(
bridge=bridge,
companion_hash=companion_hash_str,
port=tcp_port,
bind_address=bind_address,
sqlite_handler=sqlite_handler,
)
await frame_server.start()
self.companion_frame_servers.append(frame_server)
self.identity_manager.register_identity(
name=name,
identity=identity,
config=comp_config,
identity_type="companion",
)
logger.info(
f"Loaded companion '{name}': hash=0x{companion_hash:02x}, "
f"port={tcp_port}, bind={bind_address}"
)
except Exception as e:
logger.error(f"Failed to load companion '{name}': {e}", exc_info=True)
def _register_identity_everywhere(
self,
name: str,
@@ -509,6 +644,18 @@ class RepeaterDaemon:
await self.dispatcher.run_forever()
except KeyboardInterrupt:
logger.info("Shutting down...")
for frame_server in getattr(self, "companion_frame_servers", []):
try:
await frame_server.stop()
except Exception as e:
logger.debug(f"Companion frame server stop: {e}")
if hasattr(self, "companion_bridges"):
for bridge in self.companion_bridges.values():
if hasattr(bridge, "stop"):
try:
await bridge.stop()
except Exception as e:
logger.debug(f"Companion bridge stop: {e}")
if self.router:
await self.router.stop()
if self.http_server:
+54 -17
View File
@@ -8,7 +8,8 @@ from pymc_core.node.handlers.login_server import LoginServerHandler
from pymc_core.node.handlers.text import TextMessageHandler
from pymc_core.node.handlers.path import PathHandler
from pymc_core.node.handlers.protocol_request import ProtocolRequestHandler
from pymc_core.node.handlers.group_text import GroupTextHandler
from pymc_core.node.handlers.protocol_response import ProtocolResponseHandler
logger = logging.getLogger("PacketRouter")
class PacketRouter:
@@ -93,36 +94,72 @@ class PacketRouter:
rssi = getattr(packet, "rssi", 0)
snr = getattr(packet, "snr", 0.0)
await self.daemon.advert_helper.process_advert_packet(packet, rssi, snr)
# Also feed adverts to companion bridges (for contact/path updates)
for bridge in getattr(self.daemon, "companion_bridges", {}).values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge advert error: {e}")
elif payload_type == LoginServerHandler.payload_type():
# Process ANON_REQ login packet for all identities
if self.daemon.login_helper:
# Route to companion if dest is a companion; else to login_helper
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif self.daemon.login_helper:
handled = await self.daemon.login_helper.process_login_packet(packet)
# Only skip forwarding if we actually handled it
if handled:
processed_by_injection = True
elif payload_type == TextMessageHandler.payload_type():
# Process TXT_MSG packet for all identities
if self.daemon.text_helper:
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif self.daemon.text_helper:
handled = await self.daemon.text_helper.process_text_packet(packet)
# Only skip forwarding if we actually handled it
if handled:
processed_by_injection = True
elif payload_type == PathHandler.payload_type():
# Process PATH packet to update client out_path for direct routing
if self.daemon.path_helper:
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif self.daemon.path_helper:
await self.daemon.path_helper.process_path_packet(packet)
# Note: process_path_packet returns False to allow forwarding
elif payload_type == ProtocolResponseHandler.payload_type():
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif payload_type == ProtocolRequestHandler.payload_type():
# Process protocol request packet (status, telemetry, neighbors, etc.)
if self.daemon.protocol_request_helper:
dest_hash = packet.payload[0] if packet.payload else None
companion_bridges = getattr(self.daemon, "companion_bridges", {})
if dest_hash is not None and dest_hash in companion_bridges:
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif self.daemon.protocol_request_helper:
handled = await self.daemon.protocol_request_helper.process_request_packet(packet)
if handled:
processed_by_injection = True
elif payload_type == GroupTextHandler.payload_type():
# GRP_TXT: pass to all companions (they filter by channel); still forward
companion_bridges = getattr(self.daemon, "companion_bridges", {})
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge GRP_TXT error: {e}")
# Only pass to repeater engine if not already processed by injection
if self.daemon.repeater_handler and not processed_by_injection:
metadata = {
+36 -18
View File
@@ -627,12 +627,15 @@ class APIEndpoints:
live_update=True,
live_update_sections=['duty_cycle']
)
if not result.get("saved", False):
return self._error(result.get("error", "Failed to save configuration to file"))
logger.info(f"Duty cycle config updated: {', '.join(applied)}")
return self._success({
"applied": applied,
"persisted": result.get("saved", False),
"persisted": True,
"live_update": result.get("live_updated", False),
"restart_required": False,
"message": "Duty cycle settings applied immediately."
@@ -1126,8 +1129,10 @@ class APIEndpoints:
self.config["radio"]["cad"]["min_threshold"] = min_val
config_path = getattr(self, '_config_path', '/etc/pymc_repeater/config.yaml')
self.config_manager.save_to_file()
saved, err = self.config_manager.save_to_file()
if not saved:
return self._error(err or "Failed to save configuration to file")
logger.info(f"Saved CAD settings to config: peak={peak}, min={min_val}, rate={detection_rate:.1f}%")
return {
"success": True,
@@ -1330,12 +1335,15 @@ class APIEndpoints:
live_update=True,
live_update_sections=live_sections
)
if not result.get("saved", False):
return self._error(result.get("error", "Failed to save configuration to file"))
logger.info(f"Radio config updated: {', '.join(applied)}")
return self._success({
"applied": applied,
"persisted": result.get("saved", False),
"persisted": True,
"live_update": result.get("live_updated", False),
"restart_required": not result.get("live_updated", False),
"message": "Settings applied immediately." if result.get("live_updated") else "Settings saved. Restart service to apply changes."
@@ -1649,8 +1657,12 @@ class APIEndpoints:
# Update the configuration file using ConfigManager
try:
self.config_manager.save_to_file()
logger.info(f"Updated running config and saved global flood policy to file: {'allow' if global_flood_allow else 'deny'}")
saved, err = self.config_manager.save_to_file()
if saved:
logger.info(f"Updated running config and saved global flood policy to file: {'allow' if global_flood_allow else 'deny'}")
else:
logger.error(f"Failed to save global flood policy to file: {err}")
return self._error(err or "Failed to save configuration to file")
except Exception as e:
logger.error(f"Failed to save global flood policy to file: {e}")
return self._error(f"Failed to save configuration to file: {e}")
@@ -2006,8 +2018,10 @@ class APIEndpoints:
self.config["identities"]["room_servers"] = room_servers
# Save to file
self.config_manager.save_to_file()
saved, err = self.config_manager.save_to_file()
if not saved:
return self._error(err or "Failed to save configuration to file")
logger.info(f"Created new identity: {name} (type: {identity_type}){' with auto-generated key' if key_was_generated else ''}")
# Hot reload - register identity immediately
@@ -2152,9 +2166,11 @@ class APIEndpoints:
# Save to config
room_servers[identity_index] = identity
self.config["identities"]["room_servers"] = room_servers
self.config_manager.save_to_file()
saved, err = self.config_manager.save_to_file()
if not saved:
return self._error(err or "Failed to save configuration to file")
logger.info(f"Updated identity: {name}")
# Hot reload - re-register identity if key changed or name changed
@@ -2249,9 +2265,11 @@ class APIEndpoints:
# Update config
self.config["identities"]["room_servers"] = room_servers
self.config_manager.save_to_file()
saved, err = self.config_manager.save_to_file()
if not saved:
return self._error(err or "Failed to save configuration to file")
logger.info(f"Deleted identity: {name}")
unregister_success = False
+2 -1
View File
@@ -435,7 +435,8 @@ class AuthEndpoints:
# Save to config file using ConfigManager
if self.config_manager:
if self.config_manager.save_to_file():
saved, _ = self.config_manager.save_to_file()
if saved:
logger.info(f"Admin password changed successfully by user {user['username']}")
return json.dumps({
'success': True,