mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
Reapply refactor from ce8381a (replace monolithic FrameServer with thin pymc_core subclass, re-export constants, SQLite persistence hooks) while preserving pre-refactor whitespace where patch applied cleanly. Remaining files match refactor commit exactly. Diff vs ce8381a is whitespace-only. Co-authored-by: Cursor <cursoragent@cursor.com>
559 lines
21 KiB
Python
559 lines
21 KiB
Python
"""
|
|
Text message (TXT_MSG) handling helper for pyMC Repeater.
|
|
|
|
This module processes incoming text messages for all managed identities
|
|
(repeater identity + identity manager identities).
|
|
Also handles CLI commands for admin users on the repeater identity.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import struct
|
|
import time
|
|
|
|
from pymc_core.node.handlers.text import TextMessageHandler
|
|
|
|
from .mesh_cli import MeshCLI
|
|
from .room_server import RoomServer
|
|
|
|
logger = logging.getLogger("TextHelper")
|
|
|
|
# Text message type flags
|
|
TXT_TYPE_PLAIN = 0x00
|
|
TXT_TYPE_CLI_DATA = 0x01
|
|
|
|
|
|
class TextHelper:
|
|
|
|
def __init__(
|
|
self,
|
|
identity_manager,
|
|
packet_injector=None,
|
|
acl_dict=None,
|
|
log_fn=None,
|
|
config_path: str = None,
|
|
config: dict = None,
|
|
config_manager=None,
|
|
sqlite_handler=None,
|
|
send_advert_callback=None,
|
|
):
|
|
|
|
self.identity_manager = identity_manager
|
|
self.packet_injector = packet_injector
|
|
self.log_fn = log_fn or logger.info
|
|
self.acl_dict = acl_dict or {} # Per-identity ACLs keyed by hash_byte
|
|
self.sqlite_handler = sqlite_handler # For room server database operations
|
|
self.send_advert_callback = send_advert_callback # Callback to send repeater advert
|
|
|
|
# Dictionary of handlers keyed by dest_hash
|
|
self.handlers = {}
|
|
|
|
# Dictionary of room servers keyed by dest_hash
|
|
self.room_servers = {}
|
|
|
|
# Track repeater identity for CLI commands
|
|
self.repeater_hash = None
|
|
|
|
# Store config for later use
|
|
self.config_path = config_path
|
|
self.config = config
|
|
self.config_manager = config_manager
|
|
|
|
# Store for later CLI initialization (needs identity and storage)
|
|
self.config_path = config_path
|
|
self.config = config
|
|
|
|
# Initialize CLI handler later when repeater identity is registered
|
|
self.cli = None
|
|
|
|
def register_identity(
|
|
self, name: str, identity, identity_type: str = "room_server", radio_config=None
|
|
):
|
|
|
|
hash_byte = identity.get_public_key()[0]
|
|
|
|
# Get ACL for this identity
|
|
identity_acl = self.acl_dict.get(hash_byte)
|
|
if not identity_acl:
|
|
logger.warning(f"Cannot register identity '{name}': no ACL for hash 0x{hash_byte:02X}")
|
|
return
|
|
|
|
# Create a contacts wrapper from this identity's ACL
|
|
acl_contacts = self._create_acl_contacts_wrapper(identity_acl)
|
|
|
|
# Create TextMessageHandler for this identity
|
|
handler = TextMessageHandler(
|
|
local_identity=identity,
|
|
contacts=acl_contacts,
|
|
log_fn=self.log_fn,
|
|
send_packet_fn=self._send_packet,
|
|
radio_config=radio_config,
|
|
)
|
|
|
|
# Register by dest hash
|
|
hash_byte = identity.get_public_key()[0]
|
|
self.handlers[hash_byte] = {
|
|
"handler": handler,
|
|
"identity": identity,
|
|
"name": name,
|
|
"type": identity_type,
|
|
}
|
|
|
|
# Track repeater identity for CLI commands
|
|
if identity_type == "repeater":
|
|
self.repeater_hash = hash_byte
|
|
logger.info(f"Set repeater hash for CLI: 0x{hash_byte:02X}")
|
|
|
|
# Initialize CLI handler now that we have the repeater identity
|
|
if self.config_path and self.config and self.config_manager:
|
|
self.cli = MeshCLI(
|
|
self.config_path,
|
|
self.config,
|
|
self.config_manager,
|
|
identity_type="repeater",
|
|
enable_regions=True,
|
|
send_advert_callback=self.send_advert_callback,
|
|
identity=identity,
|
|
storage_handler=self.sqlite_handler,
|
|
)
|
|
logger.info(
|
|
"Initialized CLI handler for repeater commands with identity and storage"
|
|
)
|
|
|
|
# Create RoomServer instance for room_server identities
|
|
if identity_type == "room_server" and self.sqlite_handler:
|
|
try:
|
|
from .room_server import MAX_UNSYNCED_POSTS
|
|
|
|
room_config = radio_config or {}
|
|
max_posts = room_config.get("max_posts", MAX_UNSYNCED_POSTS)
|
|
|
|
# Enforce hard limit
|
|
if max_posts > MAX_UNSYNCED_POSTS:
|
|
logger.warning(
|
|
f"Room '{name}': Configured max_posts={max_posts} exceeds hard limit "
|
|
f"of {MAX_UNSYNCED_POSTS}, capping to {MAX_UNSYNCED_POSTS}"
|
|
)
|
|
max_posts = MAX_UNSYNCED_POSTS
|
|
|
|
room_server = RoomServer(
|
|
room_hash=hash_byte,
|
|
room_name=name,
|
|
local_identity=identity,
|
|
sqlite_handler=self.sqlite_handler,
|
|
packet_injector=self.packet_injector,
|
|
acl=identity_acl,
|
|
max_posts=max_posts,
|
|
config_path=self.config_path,
|
|
config=self.config,
|
|
config_manager=self.config_manager,
|
|
)
|
|
|
|
self.room_servers[hash_byte] = room_server
|
|
|
|
# Start sync loop
|
|
asyncio.create_task(room_server.start())
|
|
|
|
logger.info(
|
|
f"Registered room server '{name}': hash=0x{hash_byte:02X}, "
|
|
f"max_posts={max_posts}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create room server '{name}': {e}", exc_info=True)
|
|
|
|
logger.info(f"Registered {identity_type} '{name}' text handler: hash=0x{hash_byte:02X}")
|
|
|
|
def _create_acl_contacts_wrapper(self, acl):
|
|
|
|
class ACLContactsWrapper:
|
|
def __init__(self, identity_acl):
|
|
self._acl = identity_acl
|
|
|
|
@property
|
|
def contacts(self):
|
|
contact_list = []
|
|
for client_info in self._acl.get_all_clients():
|
|
# Create a minimal contact object that TextMessageHandler needs
|
|
class ContactProxy:
|
|
def __init__(self, client):
|
|
self.public_key = client.id.get_public_key().hex()
|
|
self.name = f"client_{self.public_key[:8]}"
|
|
|
|
contact_list.append(ContactProxy(client_info))
|
|
return contact_list
|
|
|
|
return ACLContactsWrapper(acl)
|
|
|
|
async def process_text_packet(self, packet):
|
|
|
|
try:
|
|
if len(packet.payload) < 2:
|
|
return False
|
|
|
|
dest_hash = packet.payload[0]
|
|
src_hash = packet.payload[1]
|
|
|
|
handler_info = self.handlers.get(dest_hash)
|
|
if handler_info:
|
|
logger.debug(
|
|
f"Routing text message to '{handler_info['name']}': "
|
|
f"dest=0x{dest_hash:02X}, src=0x{src_hash:02X}"
|
|
)
|
|
|
|
# Let handler decrypt the message first
|
|
await handler_info["handler"](packet)
|
|
|
|
# Call placeholder for custom processing
|
|
await self._on_message_received(
|
|
identity_name=handler_info["name"],
|
|
identity_type=handler_info["type"],
|
|
packet=packet,
|
|
dest_hash=dest_hash,
|
|
src_hash=src_hash,
|
|
)
|
|
|
|
# Mark packet as handled
|
|
packet.mark_do_not_retransmit()
|
|
return True
|
|
else:
|
|
logger.debug(f"No text handler for hash 0x{dest_hash:02X}, allowing forward")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing text packet: {e}")
|
|
return False
|
|
|
|
async def _on_message_received(
|
|
self,
|
|
identity_name: str,
|
|
identity_type: str,
|
|
packet,
|
|
dest_hash: int,
|
|
src_hash: int,
|
|
):
|
|
|
|
# Placeholder - can be overridden or callback can be added
|
|
logger.debug(
|
|
f"Message received for {identity_type} '{identity_name}' " f"from 0x{src_hash:02X}"
|
|
)
|
|
|
|
# Extract decrypted message if available
|
|
if hasattr(packet, "decrypted") and packet.decrypted:
|
|
message_text = packet.decrypted.get("text", "<unknown>")
|
|
|
|
# Clean message text - remove null bytes and trailing whitespace
|
|
message_text = message_text.rstrip("\x00").rstrip()
|
|
|
|
logger.info(f"[{identity_type}:{identity_name}] Message: {message_text}")
|
|
|
|
# Handle room server messages
|
|
if identity_type == "room_server" and dest_hash in self.room_servers:
|
|
room_server = self.room_servers[dest_hash]
|
|
|
|
# Check if this is a CLI command FIRST (before storing as post)
|
|
if self._is_cli_command(message_text):
|
|
# Handle CLI command - do NOT store as post
|
|
if room_server and room_server.cli:
|
|
try:
|
|
# Check admin permission
|
|
is_admin = self._check_admin_permission_for_identity(
|
|
src_hash, dest_hash
|
|
)
|
|
|
|
if not is_admin:
|
|
logger.warning(
|
|
f"Room '{identity_name}': CLI command denied from 0x{src_hash:02X} (not admin)"
|
|
)
|
|
return
|
|
|
|
# Get sender's full pubkey
|
|
identity_acl = self.acl_dict.get(dest_hash)
|
|
sender_pubkey = bytes([src_hash]) + b"\x00" * 31 # Default
|
|
if identity_acl:
|
|
for client_info in identity_acl.get_all_clients():
|
|
if client_info.id.get_public_key()[0] == src_hash:
|
|
sender_pubkey = client_info.id.get_public_key()
|
|
break
|
|
|
|
# Handle CLI command
|
|
reply = room_server.cli.handle_command(
|
|
sender_pubkey=sender_pubkey, command=message_text, is_admin=is_admin
|
|
)
|
|
|
|
logger.info(
|
|
f"Room '{identity_name}': CLI command from 0x{src_hash:02X}: {message_text[:50]} -> {reply[:100]}"
|
|
)
|
|
|
|
# Send reply back to sender
|
|
handler_info = self.handlers.get(dest_hash)
|
|
if handler_info:
|
|
await self._send_cli_reply(packet, reply, handler_info)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing room server CLI command: {e}", exc_info=True
|
|
)
|
|
|
|
# CLI command handled, don't store as post
|
|
return
|
|
|
|
# NOT a CLI command - store as regular room post
|
|
try:
|
|
# Get sender's full pubkey
|
|
identity_acl = self.acl_dict.get(dest_hash)
|
|
sender_pubkey = bytes([src_hash]) + b"\x00" * 31 # Default
|
|
if identity_acl:
|
|
for client_info in identity_acl.get_all_clients():
|
|
if client_info.id.get_public_key()[0] == src_hash:
|
|
sender_pubkey = client_info.id.get_public_key()
|
|
break
|
|
|
|
# Store message as post
|
|
sender_timestamp = int(time.time())
|
|
success = await room_server.add_post(
|
|
client_pubkey=sender_pubkey,
|
|
message_text=message_text,
|
|
sender_timestamp=sender_timestamp,
|
|
txt_type=TXT_TYPE_PLAIN,
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
f"Room '{identity_name}': New post from {sender_pubkey[:4].hex()}: {message_text[:50]}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing room post: {e}", exc_info=True)
|
|
|
|
return
|
|
|
|
# Check if this is a CLI command to the repeater (AFTER decryption)
|
|
if dest_hash == self.repeater_hash and self.cli and self._is_cli_command(message_text):
|
|
try:
|
|
# Check admin permission
|
|
is_admin = self._check_admin_permission_for_identity(
|
|
src_hash, self.repeater_hash
|
|
)
|
|
|
|
# If not admin, log and return without sending reply
|
|
if not is_admin:
|
|
logger.warning(
|
|
f"CLI command denied from 0x{src_hash:02X} (not admin): {message_text[:50]}"
|
|
)
|
|
return
|
|
|
|
# Get client for full public key
|
|
repeater_acl = self.acl_dict.get(self.repeater_hash)
|
|
sender_pubkey = bytes([src_hash]) + b"\x00" * 31 # Default
|
|
if repeater_acl:
|
|
for client_info in repeater_acl.get_all_clients():
|
|
if client_info.id.get_public_key()[0] == src_hash:
|
|
sender_pubkey = client_info.id.get_public_key()
|
|
break
|
|
|
|
# Handle CLI command
|
|
reply = self.cli.handle_command(
|
|
sender_pubkey=sender_pubkey, command=message_text, is_admin=is_admin
|
|
)
|
|
|
|
logger.info(
|
|
f"CLI command from 0x{src_hash:02X}: {message_text[:50]} -> {reply[:100]}"
|
|
)
|
|
|
|
# Send reply back to sender
|
|
handler_info = self.handlers.get(dest_hash)
|
|
if handler_info:
|
|
await self._send_cli_reply(packet, reply, handler_info)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing CLI command: {e}", exc_info=True)
|
|
|
|
async def _send_packet(self, packet, wait_for_ack: bool = False):
|
|
|
|
if self.packet_injector:
|
|
try:
|
|
return await self.packet_injector(packet, wait_for_ack=wait_for_ack)
|
|
except Exception as e:
|
|
logger.error(f"Error sending packet: {e}")
|
|
return False
|
|
else:
|
|
logger.error("No packet injector configured, cannot send packet")
|
|
return False
|
|
|
|
def set_message_callback(self, callback):
|
|
|
|
self._message_callback = callback
|
|
|
|
def list_registered_identities(self):
|
|
|
|
return [
|
|
{
|
|
"hash": hash_byte,
|
|
"name": info["name"],
|
|
"type": info["type"],
|
|
}
|
|
for hash_byte, info in self.handlers.items()
|
|
]
|
|
|
|
async def cleanup(self):
|
|
"""Cleanup room servers and handlers."""
|
|
# Stop all room server sync loops
|
|
for room_server in self.room_servers.values():
|
|
try:
|
|
await room_server.stop()
|
|
except Exception as e:
|
|
logger.error(f"Error stopping room server: {e}")
|
|
|
|
logger.info("TextHelper cleanup complete")
|
|
|
|
def _is_cli_command(self, message: str) -> bool:
|
|
"""Check if message looks like a CLI command."""
|
|
# Strip optional sequence prefix (XX|)
|
|
if len(message) > 4 and message[2] == "|":
|
|
message = message[3:].strip()
|
|
|
|
# Check for known command prefixes
|
|
command_prefixes = [
|
|
"get ",
|
|
"set ",
|
|
"reboot",
|
|
"advert",
|
|
"clock",
|
|
"time ",
|
|
"password ",
|
|
"clear ",
|
|
"ver",
|
|
"board",
|
|
"neighbors",
|
|
"neighbor.",
|
|
"tempradio ",
|
|
"setperm ",
|
|
"region",
|
|
"sensor ",
|
|
"gps",
|
|
"log ",
|
|
"stats-",
|
|
"start ota",
|
|
]
|
|
|
|
return any(message.startswith(prefix) for prefix in command_prefixes)
|
|
|
|
def _check_admin_permission(self, src_hash: int) -> bool:
|
|
"""Check if sender has admin permissions for repeater (legacy method)."""
|
|
return self._check_admin_permission_for_identity(src_hash, self.repeater_hash)
|
|
|
|
def _check_admin_permission_for_identity(self, src_hash: int, identity_hash: int) -> bool:
|
|
"""Check if sender has admin permissions (bit 0x02) for a specific identity."""
|
|
# Get the identity's ACL
|
|
identity_acl = self.acl_dict.get(identity_hash)
|
|
if not identity_acl:
|
|
return False
|
|
|
|
# Get client by hash byte
|
|
clients = identity_acl.get_all_clients()
|
|
for client_info in clients:
|
|
pubkey = client_info.id.get_public_key()
|
|
if pubkey[0] == src_hash:
|
|
# Check admin bit (0x02 = PERM_ACL_ADMIN)
|
|
permissions = getattr(client_info, "permissions", 0)
|
|
PERM_ACL_ADMIN = 0x02
|
|
return (permissions & 0x02) == PERM_ACL_ADMIN
|
|
|
|
return False
|
|
|
|
async def _send_cli_reply(self, original_packet, reply_text: str, handler_info: dict):
|
|
"""
|
|
Send CLI reply back to sender using TXT_MSG datagram.
|
|
|
|
Follows the C++ pattern (lines 603-609 in MyMesh.cpp):
|
|
- Creates TXT_MSG datagram with TXT_TYPE_CLI_DATA flag
|
|
- Encrypts with shared secret from ACL client
|
|
- Uses client->out_path_len to decide routing:
|
|
* if out_path_len < 0: sendFlood()
|
|
* else: sendDirect() with stored out_path
|
|
"""
|
|
import time
|
|
|
|
from pymc_core.protocol import Identity, PacketBuilder
|
|
from pymc_core.protocol.constants import PAYLOAD_TYPE_TXT_MSG
|
|
|
|
try:
|
|
src_hash = original_packet.payload[1]
|
|
dest_hash = original_packet.payload[0]
|
|
|
|
incoming_route = original_packet.get_route_type()
|
|
logger.debug(
|
|
f"CLI reply: original packet dest=0x{dest_hash:02X}, src=0x{src_hash:02X}, incoming_route={incoming_route}"
|
|
)
|
|
|
|
# Find the client in the DESTINATION identity's ACL (not always repeater!)
|
|
# dest_hash is the identity that received the command (repeater OR room server)
|
|
identity_acl = self.acl_dict.get(dest_hash)
|
|
if not identity_acl:
|
|
logger.error(f"No ACL found for identity 0x{dest_hash:02X} for CLI reply")
|
|
return
|
|
|
|
client = None
|
|
for client_info in identity_acl.get_all_clients():
|
|
pubkey = client_info.id.get_public_key()
|
|
if pubkey[0] == src_hash:
|
|
client = client_info
|
|
break
|
|
|
|
if not client:
|
|
logger.error(
|
|
f"Client 0x{src_hash:02X} not found in identity 0x{dest_hash:02X} ACL for CLI reply"
|
|
)
|
|
return
|
|
|
|
# Get shared secret from client
|
|
shared_secret = client.shared_secret
|
|
if not shared_secret or len(shared_secret) == 0:
|
|
logger.error(f"No shared secret for client 0x{src_hash:02X}")
|
|
return
|
|
|
|
# Build reply packet payload
|
|
# Format: timestamp(4) + flags(1) + reply_text
|
|
timestamp = int(time.time())
|
|
TXT_TYPE_CLI_DATA = 0x01
|
|
flags = TXT_TYPE_CLI_DATA << 2 # Upper 6 bits are txt_type
|
|
|
|
reply_bytes = reply_text.encode("utf-8")
|
|
plaintext = timestamp.to_bytes(4, "little") + bytes([flags]) + reply_bytes
|
|
|
|
# Decide routing based on client->out_path_len (C++ pattern)
|
|
# out_path is populated by PATH packets, NOT from incoming text message route
|
|
route_type = "flood" if client.out_path_len < 0 else "direct"
|
|
logger.debug(
|
|
f"CLI reply: client.out_path_len={client.out_path_len}, using route_type={route_type}"
|
|
)
|
|
|
|
reply_packet = PacketBuilder.create_datagram(
|
|
ptype=PAYLOAD_TYPE_TXT_MSG,
|
|
dest=client.id,
|
|
local_identity=handler_info["identity"],
|
|
secret=shared_secret,
|
|
plaintext=plaintext,
|
|
route_type=route_type,
|
|
)
|
|
|
|
# Add path for direct routing if available from PATH packets
|
|
if client.out_path_len >= 0 and len(client.out_path) > 0:
|
|
reply_packet.path = bytearray(client.out_path[: client.out_path_len])
|
|
reply_packet.path_len = client.out_path_len
|
|
logger.debug(
|
|
f"CLI reply: Added stored out_path - path_len={reply_packet.path_len}, path={[hex(b) for b in reply_packet.path]}"
|
|
)
|
|
|
|
# Send with delay (CLI_REPLY_DELAY_MILLIS = 600ms in C++)
|
|
CLI_REPLY_DELAY_MS = 600
|
|
await asyncio.sleep(CLI_REPLY_DELAY_MS / 1000.0)
|
|
|
|
await self._send_packet(reply_packet, wait_for_ack=False)
|
|
logger.info(
|
|
f"CLI reply sent to 0x{src_hash:02X} via {route_type.upper()}: {reply_text[:50]}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending CLI reply: {e}", exc_info=True)
|